0%

agentscope 源码分析(5):Plan & Trace

这篇文章我们继续学习 AgentScope 的源码,主要聚焦于 AgentScope Plan 与 Trace 两个组件的实现。Plan 组件为 Agent 提供了将复杂任务分解为一系列子任务的能力,而 Trace 组件则用于追踪和记录 Agent 的执行过程,属于 AgentScope 框架的基础设施。

Plan 模块

AgentScope 的 plan 模块是一个任务规划与管理系统,旨在帮助 AI Agent 将复杂任务分解为可执行的子任务,并通过状态机机制追踪任务执行进度。Plan 模块的代码位于 src/agentscope/plan/ 目录下,文件列表如下:

1
2
3
4
5
6
src/agentscope/plan/
├── __init__.py # 模块导出
├── _plan_model.py # 数据模型定义
├── _plan_notebook.py # 核心管理器
├── _storage_base.py # 存储抽象基类
└── _in_memory_storage.py # 内存存储实现

数据模型

_plan_model.py 定义了 Plan 模块的核心数据模型抽象,其使用 SubTask 来表示计划中的每个子任务:

1
2
3
4
5
6
7
8
class SubTask(BaseModel):
name: str # 子任务名称
description: str # 详细描述
expected_outcome: str # 预期产出
outcome: str | None = None # 实际产出
state: Literal["todo", "in_progress", "done", "abandoned"] = "todo"
created_at: str # 创建时间
finished_at: str | None = None # 完成时间
  • 使用 state 属性追踪子任务的执行状态,包括待办(todo)、进行中(in_progress)、完成(done)和废弃(abandoned)

SubTask 类型还提供了一系列方法,用以管理子任务:

  • finish(self, outcome: str) -> None 完成子任务,设置状态为 done,记录实际产出和完成时间
  • to_oneline_markdown(self) -> str:转换为单行 Markdown 格式(如 - [ ] task_name
  • to_markdown(self:转换为 Markdown 格式,detailed=True 时包含详细描述

Plan 用来表示整个计划:

1
2
3
4
5
6
7
8
9
10
class Plan(BaseModel):
id: str # 唯一标识(自动生成)
name: str # 计划名称
description: str # 计划描述
expected_outcome: str # 预期产出
subtasks: list[SubTask] # 子任务列表
created_at: str # 创建时间
state: Literal["todo", "in_progress", "done", "abandoned"]
finished_at: str | None = None
outcome: str | None = None

Plan 类型也提供了一系列方法,用以管理计划:

  • refresh_plan_state(self) -> str:基于子任务状态刷新计划状态
  • finish(self, state: Literal["done", "abandoned"], outcome: str) -> None:完成计划,设置状态和产出,记录完成时间
  • to_markdown(self, detailed: bool = False) -> str:转换为 Markdown 格式,包含计划信息和子任务列表

refreshed_plan_state 方法通过检查子任务状态来更新计划的整体状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def refresh_plan_state(self) -> str:
"""基于子任务状态自动刷新计划状态"""
if self.state in ["done", "abandoned"]:
return ""

any_in_progress = any(_.state == "in_progress" for _ in self.subtasks)

if any_in_progress and self.state == "todo":
self.state = "in_progress"
return "The plan state has been updated to 'in_progress'."

elif not any_in_progress and self.state == "in_progress":
self.state = "todo"
return "The plan state has been updated to 'todo'."

return ""

计划存储

PlanStorageBase 定义了计划存储的抽象基类,用来保存所有计划。它继承自 StateModule 类,支持历史计划数据的序列化/反序列化操作。关于 StateModule 的详细用法,我们将在后续的文章中详细介绍。

1
2
3
4
5
6
7
8
9
10
11
12
class PlanStorageBase(StateModule):
@abstractmethod
async def add_plan(self, plan: Plan) -> None: ...

@abstractmethod
async def delete_plan(self, plan_id: str) -> None: ...

@abstractmethod
async def get_plans(self) -> list[Plan]: ...

@abstractmethod
async def get_plan(self, plan_id: str) -> Plan | None: ...
  • 定义存储接口,支持未来扩展(如数据库存储、分布式存储)
  • 继承 StateModule,支持状态序列化与反序列化

InMemoryPlanStorage 实现了 PlanStorageBase,使用内存中的有序字典来存储计划,并通过 plans 这个 state 属性支持 Plan 的序列化与反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class InMemoryPlanStorage(PlanStorageBase):
def __init__(self) -> None:
super().__init__()
# key 为 plan 的 id,value 为 Plan 对象
self.plans = OrderedDict()

# 注册状态序列化/反序列化
self.register_state(
"plans",
lambda plans: {k: v.model_dump() for k, v in plans.items()},
lambda json_data: OrderedDict(
(k, Plan.model_validate(v)) for k, v in json_data.items()
),
)

PlanNotebook

PlanNotebook 是 Plan 模块的核心数据类,为 Agent 提供完整的 计划管理 工具集。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PlanNotebook(StateModule):
def __init__(
self,
max_subtasks: int | None = None,
plan_to_hint: Callable[[Plan | None], str | None] | None = None,
storage: PlanStorageBase | None = None,
):
super().__init__()
self.max_tasks = max_subtasks
self.plan_to_hint = plan_to_hint or DefaultPlanToHint()
self.storage = storage or InMemoryPlanStorage()
self.current_plan: Plan | None = None
self._plan_change_hooks: dict[str, Callable] = OrderedDict()

# 当前计划需要进行状态管理
self.register_state(
"current_plan",
custom_to_json=lambda _: _.model_dump() if _ else None,
custom_from_json=lambda _: Plan.model_validate(_) if _ else None,
)
  • plan_to_hint 是一个可调用对象,用于将 Plan 对象转换为 LLM 提示词
  • storage 用来保存所有的历史 Plan
  • current_plan 指向当前正在处理的 Plan
  • _plan_change_hooks 用来注册计划状态变更时的 hooks 函数
  • PlanNotebook 也继承自 StateModule,以支持对 current_plan 进行状态管理

PlanNotebook 是以 Tool 的形式为 Agent 提供计划管理能力的,其 list_tools 返回了所有可用的工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def list_tools(
self,
) -> list[Callable[..., Coroutine[Any, Any, ToolResponse]]]:
return [
# subtask related tools
self.view_subtasks,
self.update_subtask_state,
self.finish_subtask,
# plan related tools
self.create_plan,
self.revise_current_plan,
self.finish_plan,
# historical plan related tools
self.view_historical_plans,
self.recover_historical_plan,
]
工具函数 功能 关键逻辑
create_plan 创建新计划 替换现有计划,触发 hooks
revise_current_plan 修改计划 支持增/改/删子任务
update_subtask_state 更新子任务状态 校验前置任务完成情况
finish_subtask 完成子任务 自动激活下一子任务
view_subtasks 查看子任务详情 Markdown 格式输出
finish_plan 结束计划 归档到历史记录
view_historical_plans 查看历史计划 从存储中检索
recover_historical_plan 恢复历史计划 替换当前计划

我们来看下 create_planfinish_plan 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def create_plan(
self,
name: str,
description: str,
expected_outcome: str,
subtasks: list[SubTask],
) -> ToolResponse:
plan = Plan(
name=name,
description=description,
expected_outcome=expected_outcome,
subtasks=subtasks,
)

# 返回工具响应
if self.current_plan is None:
res = ToolResponse(
content=[
TextBlock(
type="text",
text=f"Plan '{name}' created successfully.",
),
],
)

else:
res = ToolResponse(
content=[
TextBlock(
type="text",
text=(
"The current plan named "
f"'{self.current_plan.name}' is replaced by the "
f"newly created plan named '{name}'."
),
),
],
)

# 更新计划
self.current_plan = plan
await self._trigger_plan_change_hooks()
return res
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def finish_plan(
self,
state: Literal["done", "abandoned"],
outcome: str,
) -> ToolResponse:
if self.current_plan is None:
return ToolResponse(
content=[
TextBlock(
type="text",
text="There is no plan to finish.",
),
],
)

# 将当前计划标记为 done
self.current_plan.finish(state, outcome)

# 将当前计划存入 history
await self.storage.add_plan(self.current_plan)

self.current_plan = None
await self._trigger_plan_change_hooks()
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"The current plan is finished successfully as "
f"'{state}'.",
),
],
)

PlanNotebookget_current_hint() 方法则用于根据当前计划的状态来获取提示词:

1
2
3
4
5
6
7
8
9
10
11
async def get_current_hint(self) -> Msg | None:
hint_content = self.plan_to_hint(self.current_plan)
if hint_content:
msg = Msg(
"user",
hint_content,
"user",
)
return msg

return None
  • 它调用 plan_to_hint 将当前计划转换为提示词,如果用户在创建 PlanNotebook 时没有提供 plan_to_hint,则会使用默认的转换逻辑,即 DefaultPlanToHint 的实现
  • DefaultPlanToHint 根据当前计划的状态,生成相应的提示词。例如如下分别展示了 没有计划计划刚开始执行 是所使用的提示词:
1
2
3
4
5
6
7
no_plan: str = (
"If the user's query is complex (e.g. programming a website, game or "
"app), or requires a long chain of steps to complete (e.g. conduct "
"research on a certain topic from different sources), you NEED to "
"create a plan first by calling 'create_plan'. Otherwise, you can "
"directly execute the user's query without planning."
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
at_the_beginning: str = (
"The current plan:\n"
"```\n"
"{plan}\n"
"```\n"
"Your options include:\n"
"- Mark the first subtask as 'in_progress' by calling "
"'update_subtask_state' with subtask_idx=0 and state='in_progress', "
"and start executing it.\n"
"- If the first subtask is not executable, analyze why and what you "
"can do to advance the plan, e.g. ask user for more information, "
"revise the plan by calling 'revise_current_plan'.\n"
"- If the user asks you to do something unrelated to the plan, "
"prioritize the completion of user's query first, and then return "
"to the plan afterward.\n"
"- If the user no longer wants to perform the current plan, confirm "
"with the user and call the 'finish_plan' function.\n"
)

Plan 小结

以上就介绍了 AgentScope 的 Plan 模块的核心原理,包括其核心数据结构、提供的工具、以及如何将计划转换为提示词。这里只是介绍了 Plan 模块的内部实现原理,我们将在后续文章继续介绍 Agent 是如何使用 Plan 模块的。

Trace 模块

接下来我们再来分析 AgentScope 的 Trace 模块实现,它为 AI Agent 系统提供可观测性(Observability)支持。它追踪 Agent 执行过程中的关键操作,包括 LLM 调用、Agent 回复、工具执行、消息格式化、Embedding 等。通过 Trace,有助于解决如下问题:

问题 Tracing 如何解决
调用链路不透明 记录每个操作的 Span,形成完整调用树
性能瓶颈难定位 记录每个操作的耗时
错误难以追踪 自动捕获异常并记录
Token 消耗不明 记录 input/output tokens
调试困难 可导出到可视化平台(如 Jaeger、Arize)

AgentScope 的 Trace 实现基于 OpenTelemetry 标准,OpenTelemetry 是云原生计算基金会(CNCF)的可观测性标准,提供:

  • 统一 API:一套 API 支持多种后端(Jaeger、Prometheus、Arize 等)
  • 语义约定:标准化的属性命名,确保跨系统兼容
  • GenAI 扩展:专为 AI/LLM 场景定义的属性规范

Trace 模块的代码位于 src/agentscope/tracing/ 目录下,文件列表如下所示:

1
2
3
4
5
6
7
8
src/agentscope/tracing/
├── __init__.py # 导出接口
├── _setup.py # OpenTelemetry 配置
├── _attributes.py # 属性定义(Semantic Conventions)
├── _extractor.py # 属性提取器(核心逻辑)
├── _converter.py # ContentBlock 转换器
├── _utils.py # 序列化工具
└── _trace.py # 装饰器实现

整体调用关系如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
┌────────────────────────────────────────────────────────────────────┐
│ 用户代码 │
│ @trace_llm / @trace_reply / @trace_toolkit / @trace / ... │
├────────────────────────────────────────────────────────────────────┤
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ _trace.py (装饰器层) │ │
│ │ - 包装函数调用 │ │
│ │ - 创建 Span │ │
│ │ - 处理同步/异步/生成器 │ │
│ │ - 错误捕获 │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ _extractor.py (属性提取层) │ │
│ │ - _get_llm_request_attributes │ │
│ │ - _get_agent_request_attributes │ │
│ │ - _get_tool_request_attributes │ │
│ │ - ... │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ _converter.py + _utils.py │ │
│ │ - _convert_block_to_part (ContentBlock → OTel Part) │ │
│ │ - _serialize_to_str (对象 → JSON 字符串) │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ _attributes.py + _setup.py │ │
│ │ - SpanAttributes (OpenTelemetry Semantic Conventions) │ │
│ │ - TracerProvider + OTLPSpanExporter │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ OpenTelemetry SDK │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 外部后端 (Jaeger / Arize / Datadog / ...) │ │
│ └───────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────┘

Trace 全局配置

_setup.py 负责初始化 Trace 系统,setup_tracing 函数负责完成全局 Trace 系统的初始化工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def setup_tracing(endpoint: str) -> None:
exporter = OTLPSpanExporter(endpoint=endpoint)
span_processor = BatchSpanProcessor(exporter)

tracer_provider: TracerProvider = trace.get_tracer_provider()
if isinstance(tracer_provider, TracerProvider):
# 应用程序可能已经在其他地方初始化了 TracerProvider
# The provider is set outside, just add the span processor
tracer_provider.add_span_processor(span_processor)

else:
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)

这段函数涉及 OpenTelemetry 的几个关键概念:

  • TracerProvidor:负责整个追踪系统的全局配置和资源管理。通常一个应用只有一个 TracerProvider 实例,所有 所有 Tracer 共享相同的配置。它的主要职责有:

    • 创建和管理 Tracer
    • 管理全局配置,例如配置资源信息
    • 管理 Span 处理器
    • 配置采样策略
  • Tracer:追踪器,负责创建和启动 Span,是实际记录追踪数据的入口。Tracer 负责开启具体的监控任务。它定义了当前这一段监控属于哪个模块或哪个库

    • 多实例:可以为不同模块创建不同的 Tracer
    • 上下文传播:自动处理 Span 之间的父子关系
    • 轻量级:创建成本低,可频繁使用
  • Span:代表一个操作或工作单元,记录操作的详细信息。它代表了一段操作过程,记录了具体发生了什么(操作名)、什么时候开始、什么时候结束、以及在这个过程中发生了哪些小事(Attributes/Events)

    • Span 是有层级关系的(父子关系),通过这种关系,你能看到一个请求在系统内部是如何跳转的
    • 每个 Span 都有 Trace ID、Span ID、Attributes、Events 等信息
  • Exporter:导出器,负责将追踪数据(Span)发送到后端系统

    • 可以将 exporter 理解为"数据快递员"——负责把追踪数据打包并送到指定的目的地
    • 这段代码创建了一个 OTLPSpanExporter,即使用 OTLP 协议(OpenTelemetry Protocol)导出 Span 数据
    • endpoint:指定接收数据的目标地址(如 Jaeger、Zipkin、或其他 APM 系统的 OTLP 接收端点)
  • Span Processor(Span 处理器):管理 Span 的生命周期和导出时机

    • 这里使用 BatchSpanProcessor,即批量处理器,会收集多个 Span 后批量导出,而不是每个 Span 都立即导出,这样减少网络开销,提高性能
    • 可以将 Span Processor 理解为 数据调度员——决定何时将 Span 交给 Exporter 发送
    • 还有其他类型的处理器,如 SimpleSpanProcessor(立即导出每个 Span)

理解上述概念后,就非常容易理解 setup_tracing 函数是如何工作的了。它的核心目的是:

  • 如果当前环境中已经存在 TracerProvider,则直接添加一个新的 Span 处理器
  • 如果当前不存在全局的 TracerProvider,则创建一个新的 TracerProvider,并设置其 Span 处理器

_get_tracer() 函数则用于获取一个 Tracer 实例(使用全局的 TracerProvidor):

1
2
3
4
5
6
7
8
9
10
def _get_tracer() -> Tracer:
"""Get the tracer
Returns:
`Tracer`: The tracer with the name "agentscope" and version.
"""
from opentelemetry import trace
from .._version import __version__

return trace.get_tracer("agentscope", __version__)

_trace.py

_trace.py 中提供了各种装饰器,用来对各种函数调用进行 Trace。

装饰器 目标函数 Span 名称格式 核心属性
trace_llm ChatModelBase.call chat {model} provider, model, tokens, messages
trace_reply AgentBase.reply invoke_agent {agent_name} agent_id, agent_name, messages
trace_toolkit Toolkit.call_tool_function execute_tool {tool_name} tool_call_id, tool_name, arguments
trace_format FormatterBase.format format {provider} format_target, format_count
trace_embedding EmbeddingModelBase embeddings {model} model, dimensions
trace 任意函数 invoke_generic_function {name} function_name, input, output

我们以追踪 LLM 调用的装饰器实现 trace_llm 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def trace_llm(func: Callable) -> Callable:
"""追踪 LLM 调用"""

@wraps(func)
async def wrapper(self: ChatModelBase, *args, **kwargs):
if not _check_tracing_enabled():
return await func(*args, **kwargs)

tracer = _get_tracer()

# 提取请求属性
request_attrs = _get_llm_request_attributes(self, args, kwargs)

# 获取 span name
span_name = _get_llm_span_name(request_attributes)

with tracer.start_as_current_span(
name=span_name,
attributes={
**request_attributes,
**_get_common_attributes(),
SpanAttributes.AGENTSCOPE_FUNCTION_NAME: function_name,
},
end_on_exit=False,
) as span:
try:
res = await func(*args, **kwargs)

# 处理流式响应
if isinstance(res, AsyncGenerator):
return _trace_async_generator_wrapper(res, span)

# 记录响应属性
span.set_attributes(_get_llm_response_attributes(res))
span.set_status(StatusCode.OK)
return res
except Exception as e:
span.set_status(StatusCode.ERROR)
span.record_exception(e)
raise
return wrapper
  • 使用 _get_llm_request_attributes() 提取 llm 调用请求的相关属性
  • 根据请求属性的相关字段,初始化 span name
  • 创建 span,并记录请求相关属性
  • 调用原始函数
  • 如果是流式响应,则包装原始生成器(在迭代生成器结束后,负责记录响应属性并设置 span 状态)
  • 否则直接记录 _get_llm_response_attributes() 所提取的响应属性

之后,在所有的 LLM 调用实现上,使用 @trace_llm 装饰器,就可以自动追踪 LLM 的调用情况了。例如

1
2
3
4
5
6
7
8
9
10
11
class OpenAIChatModel(ChatModelBase):
@trace_llm
async def __call__(
self,
messages: list[dict],
tools: list[dict] | None = None,
tool_choice: Literal["auto", "none", "required"] | str | None = None,
structured_model: Type[BaseModel] | None = None,
**kwargs: Any,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
......

其他装饰器的实现都是类似的,只不过根据 trace 的目标对象不同,提取的属性、记录的 span name 会有所差异。

_extractor.py

_extractor.py 提供了各种属性提取工具函数,根据 trace 对象的不同,提取出相应的属性。例如对于 llm 调用的请求,由 _get_llm_request_attributes() 负责从请求对象中提取出所希望追踪的属性,并以字典形式返回(属性名和值的键值对):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def _get_llm_request_attributes(instance, args, kwargs) -> Dict:
return {
# 必需属性
GEN_AI_OPERATION_NAME: "chat",
GEN_AI_PROVIDER_NAME: _get_provider_name(instance),
GEN_AI_REQUEST_MODEL: instance.model_name,

# 可选参数
GEN_AI_REQUEST_TEMPERATURE: kwargs.get("temperature"),
GEN_AI_REQUEST_MAX_TOKENS: kwargs.get("max_tokens"),

# 工具定义
GEN_AI_TOOL_DEFINITIONS: _get_tool_definitions(kwargs.get("tools")),

# AgentScope 扩展
AGENTSCOPE_FUNCTION_INPUT: serialize(args, kwargs),
}

其他代码

  • _converter.py_utils.py 提供了一些工具函数,主要用于将应用程序中的属性类型转换为 OpenTelemetry 属性值
  • _attributes.py 定义了所有追踪时使用的属性名常量,AgentScope 遵循 OpenTelemetry GenAI Semantic
    Conventions 所定义的标准属性名,以提高追踪数据的可读性和兼容性

小结

这篇文章我们学习了 AgentScope 中 Plan 模块的实现,Plan 模块为 Agent 提供了任务规划能力,以支持复杂任务的执行。而 Trace 模块则为 AgentScope 本身提供了可观测能力,对于现代应用程序而言,可观测性是其不可或缺的一部分。通过 Trace 模块,我们可以追踪 AgentScope 的运行情况,从而更好地理解其内部行为和性能瓶颈。

Reference