这篇文章我们继续学习 AgentScope 的源码,主要聚焦于 AgentScope Plan 与 Trace 两个组件的实现。Plan 组件为 Agent 提供了将复杂任务分解为一系列子任务的能力,而 Trace 组件则用于追踪和记录 Agent 的执行过程,属于 AgentScope 框架的基础设施。
Plan 模块
AgentScope 的 plan 模块是一个任务规划与管理系统,旨在帮助 AI Agent 将复杂任务分解为可执行的子任务,并通过状态机机制追踪任务执行进度。Plan 模块的代码位于 src/agentscope/plan/ 目录下,文件列表如下:
1 2 3 4 5 6 src/agentscope/plan/ ├── __init__.py ├── _plan_model.py ├── _plan_notebook.py ├── _storage_base.py └── _in_memory_storage.py
数据模型
_plan_model.py 定义了 Plan 模块的核心数据模型抽象,其使用 SubTask 来表示计划中的每个子任务:
1 2 3 4 5 6 7 8 class SubTask (BaseModel ): name: str description: str expected_outcome: str outcome: str | None = None state: Literal ["todo" , "in_progress" , "done" , "abandoned" ] = "todo" created_at: str finished_at: str | None = None
使用 state 属性追踪子任务的执行状态,包括待办(todo)、进行中(in_progress)、完成(done)和废弃(abandoned)
SubTask 类型还提供了一系列方法,用以管理子任务:
finish(self, outcome: str) -> None 完成子任务,设置状态为 done,记录实际产出和完成时间
to_oneline_markdown(self) -> str:转换为单行 Markdown 格式(如 - [ ] task_name)
to_markdown(self:转换为 Markdown 格式,detailed=True 时包含详细描述
Plan 用来表示整个计划:
1 2 3 4 5 6 7 8 9 10 class Plan (BaseModel ): id : str name: str description: str expected_outcome: str subtasks: list [SubTask] created_at: str state: Literal ["todo" , "in_progress" , "done" , "abandoned" ] finished_at: str | None = None outcome: str | None = None
Plan 类型也提供了一系列方法,用以管理计划:
refresh_plan_state(self) -> str:基于子任务状态刷新计划状态
finish(self, state: Literal["done", "abandoned"], outcome: str) -> None:完成计划,设置状态和产出,记录完成时间
to_markdown(self, detailed: bool = False) -> str:转换为 Markdown 格式,包含计划信息和子任务列表
refreshed_plan_state 方法通过检查子任务状态来更新计划的整体状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def refresh_plan_state (self ) -> str : """基于子任务状态自动刷新计划状态""" if self.state in ["done" , "abandoned" ]: return "" any_in_progress = any (_.state == "in_progress" for _ in self.subtasks) if any_in_progress and self.state == "todo" : self.state = "in_progress" return "The plan state has been updated to 'in_progress'." elif not any_in_progress and self.state == "in_progress" : self.state = "todo" return "The plan state has been updated to 'todo'." return ""
计划存储
PlanStorageBase 定义了计划存储的抽象基类,用来保存所有计划。它继承自 StateModule 类,支持历史计划数据的序列化/反序列化操作。关于 StateModule 的详细用法,我们将在后续的文章中详细介绍。
1 2 3 4 5 6 7 8 9 10 11 12 class PlanStorageBase (StateModule ): @abstractmethod async def add_plan (self, plan: Plan ) -> None : ... @abstractmethod async def delete_plan (self, plan_id: str ) -> None : ... @abstractmethod async def get_plans (self ) -> list [Plan]: ... @abstractmethod async def get_plan (self, plan_id: str ) -> Plan | None : ...
定义存储接口,支持未来扩展(如数据库存储、分布式存储)
继承 StateModule,支持状态序列化与反序列化
InMemoryPlanStorage 实现了 PlanStorageBase,使用内存中的有序字典来存储计划,并通过 plans 这个 state 属性支持 Plan 的序列化与反序列化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class InMemoryPlanStorage (PlanStorageBase ): def __init__ (self ) -> None : super ().__init__() self.plans = OrderedDict() self.register_state( "plans" , lambda plans: {k: v.model_dump() for k, v in plans.items()}, lambda json_data: OrderedDict( (k, Plan.model_validate(v)) for k, v in json_data.items() ), )
PlanNotebook
PlanNotebook 是 Plan 模块的核心数据类,为 Agent 提供完整的 计划管理 工具集。其定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class PlanNotebook (StateModule ): def __init__ ( self, max_subtasks: int | None = None , plan_to_hint: Callable [[Plan | None ], str | None ] | None = None , storage: PlanStorageBase | None = None , ): super ().__init__() self.max_tasks = max_subtasks self.plan_to_hint = plan_to_hint or DefaultPlanToHint() self.storage = storage or InMemoryPlanStorage() self.current_plan: Plan | None = None self._plan_change_hooks: dict [str , Callable ] = OrderedDict() self.register_state( "current_plan" , custom_to_json=lambda _: _.model_dump() if _ else None , custom_from_json=lambda _: Plan.model_validate(_) if _ else None , )
plan_to_hint 是一个可调用对象,用于将 Plan 对象转换为 LLM 提示词
storage 用来保存所有的历史 Plan
current_plan 指向当前正在处理的 Plan
_plan_change_hooks 用来注册计划状态变更时的 hooks 函数
PlanNotebook 也继承自 StateModule,以支持对 current_plan 进行状态管理
PlanNotebook 是以 Tool 的形式为 Agent 提供计划管理能力的,其 list_tools 返回了所有可用的工具:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def list_tools ( self, ) -> list [Callable [..., Coroutine [Any , Any , ToolResponse]]]: return [ self.view_subtasks, self.update_subtask_state, self.finish_subtask, self.create_plan, self.revise_current_plan, self.finish_plan, self.view_historical_plans, self.recover_historical_plan, ]
工具函数
功能
关键逻辑
create_plan
创建新计划
替换现有计划,触发 hooks
revise_current_plan
修改计划
支持增/改/删子任务
update_subtask_state
更新子任务状态
校验前置任务完成情况
finish_subtask
完成子任务
自动激活下一子任务
view_subtasks
查看子任务详情
Markdown 格式输出
finish_plan
结束计划
归档到历史记录
view_historical_plans
查看历史计划
从存储中检索
recover_historical_plan
恢复历史计划
替换当前计划
我们来看下 create_plan 和 finish_plan 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 async def create_plan ( self, name: str , description: str , expected_outcome: str , subtasks: list [SubTask], ) -> ToolResponse: plan = Plan( name=name, description=description, expected_outcome=expected_outcome, subtasks=subtasks, ) if self.current_plan is None : res = ToolResponse( content=[ TextBlock( type ="text" , text=f"Plan '{name} ' created successfully." , ), ], ) else : res = ToolResponse( content=[ TextBlock( type ="text" , text=( "The current plan named " f"'{self.current_plan.name} ' is replaced by the " f"newly created plan named '{name} '." ), ), ], ) self.current_plan = plan await self._trigger_plan_change_hooks() return res
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 async def finish_plan ( self, state: Literal ["done" , "abandoned" ], outcome: str , ) -> ToolResponse: if self.current_plan is None : return ToolResponse( content=[ TextBlock( type ="text" , text="There is no plan to finish." , ), ], ) self.current_plan.finish(state, outcome) await self.storage.add_plan(self.current_plan) self.current_plan = None await self._trigger_plan_change_hooks() return ToolResponse( content=[ TextBlock( type ="text" , text=f"The current plan is finished successfully as " f"'{state} '." , ), ], )
PlanNotebook 的 get_current_hint() 方法则用于根据当前计划的状态来获取提示词:
1 2 3 4 5 6 7 8 9 10 11 async def get_current_hint (self ) -> Msg | None : hint_content = self.plan_to_hint(self.current_plan) if hint_content: msg = Msg( "user" , hint_content, "user" , ) return msg return None
它调用 plan_to_hint 将当前计划转换为提示词,如果用户在创建 PlanNotebook 时没有提供 plan_to_hint,则会使用默认的转换逻辑,即 DefaultPlanToHint 的实现
DefaultPlanToHint 根据当前计划的状态,生成相应的提示词。例如如下分别展示了 没有计划、计划刚开始执行 是所使用的提示词:
1 2 3 4 5 6 7 no_plan: str = ( "If the user's query is complex (e.g. programming a website, game or " "app), or requires a long chain of steps to complete (e.g. conduct " "research on a certain topic from different sources), you NEED to " "create a plan first by calling 'create_plan'. Otherwise, you can " "directly execute the user's query without planning." )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 at_the_beginning: str = ( "The current plan:\n" "```\n" "{plan}\n" "```\n" "Your options include:\n" "- Mark the first subtask as 'in_progress' by calling " "'update_subtask_state' with subtask_idx=0 and state='in_progress', " "and start executing it.\n" "- If the first subtask is not executable, analyze why and what you " "can do to advance the plan, e.g. ask user for more information, " "revise the plan by calling 'revise_current_plan'.\n" "- If the user asks you to do something unrelated to the plan, " "prioritize the completion of user's query first, and then return " "to the plan afterward.\n" "- If the user no longer wants to perform the current plan, confirm " "with the user and call the 'finish_plan' function.\n" )
Plan 小结
以上就介绍了 AgentScope 的 Plan 模块的核心原理,包括其核心数据结构、提供的工具、以及如何将计划转换为提示词。这里只是介绍了 Plan 模块的内部实现原理,我们将在后续文章继续介绍 Agent 是如何使用 Plan 模块的。
Trace 模块
接下来我们再来分析 AgentScope 的 Trace 模块实现,它为 AI Agent 系统提供可观测性(Observability)支持。它追踪 Agent 执行过程中的关键操作,包括 LLM 调用、Agent 回复、工具执行、消息格式化、Embedding 等。通过 Trace,有助于解决如下问题:
问题
Tracing 如何解决
调用链路不透明
记录每个操作的 Span,形成完整调用树
性能瓶颈难定位
记录每个操作的耗时
错误难以追踪
自动捕获异常并记录
Token 消耗不明
记录 input/output tokens
调试困难
可导出到可视化平台(如 Jaeger、Arize)
AgentScope 的 Trace 实现基于 OpenTelemetry 标准,OpenTelemetry 是云原生计算基金会(CNCF)的可观测性标准,提供:
统一 API :一套 API 支持多种后端(Jaeger、Prometheus、Arize 等)
语义约定 :标准化的属性命名,确保跨系统兼容
GenAI 扩展 :专为 AI/LLM 场景定义的属性规范
Trace 模块的代码位于 src/agentscope/tracing/ 目录下,文件列表如下所示:
1 2 3 4 5 6 7 8 src/agentscope/tracing/ ├── __init__.py ├── _setup.py ├── _attributes.py ├── _extractor.py ├── _converter.py ├── _utils.py └── _trace.py
整体调用关系如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ┌────────────────────────────────────────────────────────────────────┐ │ 用户代码 │ │ @trace_llm / @trace_reply / @trace_toolkit / @trace / ... │ ├────────────────────────────────────────────────────────────────────┤ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ _trace.py (装饰器层) │ │ │ │ - 包装函数调用 │ │ │ │ - 创建 Span │ │ │ │ - 处理同步/异步/生成器 │ │ │ │ - 错误捕获 │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ _extractor.py (属性提取层) │ │ │ │ - _get_llm_request_attributes │ │ │ │ - _get_agent_request_attributes │ │ │ │ - _get_tool_request_attributes │ │ │ │ - ... │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ _converter.py + _utils.py │ │ │ │ - _convert_block_to_part (ContentBlock → OTel Part) │ │ │ │ - _serialize_to_str (对象 → JSON 字符串) │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ _attributes.py + _setup.py │ │ │ │ - SpanAttributes (OpenTelemetry Semantic Conventions) │ │ │ │ - TracerProvider + OTLPSpanExporter │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ OpenTelemetry SDK │ │ │ └───────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌───────────────────────────────────────────────────────────────┐ │ │ │ 外部后端 (Jaeger / Arize / Datadog / ...) │ │ │ └───────────────────────────────────────────────────────────────┘ │ └────────────────────────────────────────────────────────────────────┘
Trace 全局配置
_setup.py 负责初始化 Trace 系统,setup_tracing 函数负责完成全局 Trace 系统的初始化工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def setup_tracing (endpoint: str ) -> None : exporter = OTLPSpanExporter(endpoint=endpoint) span_processor = BatchSpanProcessor(exporter) tracer_provider: TracerProvider = trace.get_tracer_provider() if isinstance (tracer_provider, TracerProvider): tracer_provider.add_span_processor(span_processor) else : tracer_provider = TracerProvider() tracer_provider.add_span_processor(span_processor) trace.set_tracer_provider(tracer_provider)
这段函数涉及 OpenTelemetry 的几个关键概念:
TracerProvidor:负责整个追踪系统的全局配置和资源管理。通常一个应用只有一个 TracerProvider 实例,所有 所有 Tracer 共享相同的配置。它的主要职责有:
创建和管理 Tracer
管理全局配置,例如配置资源信息
管理 Span 处理器
配置采样策略
Tracer:追踪器,负责创建和启动 Span,是实际记录追踪数据的入口。Tracer 负责开启具体的监控任务。它定义了当前这一段监控属于哪个模块或哪个库
多实例:可以为不同模块创建不同的 Tracer
上下文传播:自动处理 Span 之间的父子关系
轻量级:创建成本低,可频繁使用
Span:代表一个操作或工作单元,记录操作的详细信息。它代表了一段操作过程,记录了具体发生了什么(操作名)、什么时候开始、什么时候结束、以及在这个过程中发生了哪些小事(Attributes/Events)
Span 是有层级关系的(父子关系),通过这种关系,你能看到一个请求在系统内部是如何跳转的
每个 Span 都有 Trace ID、Span ID、Attributes、Events 等信息
Exporter:导出器,负责将追踪数据(Span)发送到后端系统
可以将 exporter 理解为"数据快递员"——负责把追踪数据打包并送到指定的目的地
这段代码创建了一个 OTLPSpanExporter,即使用 OTLP 协议(OpenTelemetry Protocol)导出 Span 数据
endpoint:指定接收数据的目标地址(如 Jaeger、Zipkin、或其他 APM 系统的 OTLP 接收端点)
Span Processor(Span 处理器):管理 Span 的生命周期和导出时机
这里使用 BatchSpanProcessor,即批量处理器,会收集多个 Span 后批量导出,而不是每个 Span 都立即导出,这样减少网络开销,提高性能
可以将 Span Processor 理解为 数据调度员——决定何时将 Span 交给 Exporter 发送
还有其他类型的处理器,如 SimpleSpanProcessor(立即导出每个 Span)
理解上述概念后,就非常容易理解 setup_tracing 函数是如何工作的了。它的核心目的是:
如果当前环境中已经存在 TracerProvider,则直接添加一个新的 Span 处理器
如果当前不存在全局的 TracerProvider,则创建一个新的 TracerProvider,并设置其 Span 处理器
_get_tracer() 函数则用于获取一个 Tracer 实例(使用全局的 TracerProvidor):
1 2 3 4 5 6 7 8 9 10 def _get_tracer () -> Tracer: """Get the tracer Returns: `Tracer`: The tracer with the name "agentscope" and version. """ from opentelemetry import trace from .._version import __version__ return trace.get_tracer("agentscope" , __version__)
_trace.py
_trace.py 中提供了各种装饰器,用来对各种函数调用进行 Trace。
装饰器
目标函数
Span 名称格式
核心属性
trace_llm
ChatModelBase.call
chat {model}
provider, model, tokens, messages
trace_reply
AgentBase.reply
invoke_agent {agent_name}
agent_id, agent_name, messages
trace_toolkit
Toolkit.call_tool_function
execute_tool {tool_name}
tool_call_id, tool_name, arguments
trace_format
FormatterBase.format
format {provider}
format_target, format_count
trace_embedding
EmbeddingModelBase
embeddings {model}
model, dimensions
trace
任意函数
invoke_generic_function {name}
function_name, input, output
我们以追踪 LLM 调用的装饰器实现 trace_llm 为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 def trace_llm (func: Callable ) -> Callable : """追踪 LLM 调用""" @wraps(func ) async def wrapper (self: ChatModelBase, *args, **kwargs ): if not _check_tracing_enabled(): return await func(*args, **kwargs) tracer = _get_tracer() request_attrs = _get_llm_request_attributes(self, args, kwargs) span_name = _get_llm_span_name(request_attributes) with tracer.start_as_current_span( name=span_name, attributes={ **request_attributes, **_get_common_attributes(), SpanAttributes.AGENTSCOPE_FUNCTION_NAME: function_name, }, end_on_exit=False , ) as span: try : res = await func(*args, **kwargs) if isinstance (res, AsyncGenerator): return _trace_async_generator_wrapper(res, span) span.set_attributes(_get_llm_response_attributes(res)) span.set_status(StatusCode.OK) return res except Exception as e: span.set_status(StatusCode.ERROR) span.record_exception(e) raise return wrapper
使用 _get_llm_request_attributes() 提取 llm 调用请求的相关属性
根据请求属性的相关字段,初始化 span name
创建 span,并记录请求相关属性
调用原始函数
如果是流式响应,则包装原始生成器(在迭代生成器结束后,负责记录响应属性并设置 span 状态)
否则直接记录 _get_llm_response_attributes() 所提取的响应属性
之后,在所有的 LLM 调用实现上,使用 @trace_llm 装饰器,就可以自动追踪 LLM 的调用情况了。例如
1 2 3 4 5 6 7 8 9 10 11 class OpenAIChatModel (ChatModelBase ): @trace_llm async def __call__ ( self, messages: list [dict ], tools: list [dict ] | None = None , tool_choice: Literal ["auto" , "none" , "required" ] | str | None = None , structured_model: Type [BaseModel] | None = None , **kwargs: Any , ) -> ChatResponse | AsyncGenerator[ChatResponse, None ]: ......
其他装饰器的实现都是类似的,只不过根据 trace 的目标对象不同,提取的属性、记录的 span name 会有所差异。
_extractor.py 提供了各种属性提取工具函数,根据 trace 对象的不同,提取出相应的属性。例如对于 llm 调用的请求,由 _get_llm_request_attributes() 负责从请求对象中提取出所希望追踪的属性,并以字典形式返回(属性名和值的键值对):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def _get_llm_request_attributes (instance, args, kwargs ) -> Dict : return { GEN_AI_OPERATION_NAME: "chat" , GEN_AI_PROVIDER_NAME: _get_provider_name(instance), GEN_AI_REQUEST_MODEL: instance.model_name, GEN_AI_REQUEST_TEMPERATURE: kwargs.get("temperature" ), GEN_AI_REQUEST_MAX_TOKENS: kwargs.get("max_tokens" ), GEN_AI_TOOL_DEFINITIONS: _get_tool_definitions(kwargs.get("tools" )), AGENTSCOPE_FUNCTION_INPUT: serialize(args, kwargs), }
其他代码
_converter.py 和 _utils.py 提供了一些工具函数,主要用于将应用程序中的属性类型转换为 OpenTelemetry 属性值
_attributes.py 定义了所有追踪时使用的属性名常量,AgentScope 遵循 OpenTelemetry GenAI Semantic
Conventions 所定义的标准属性名,以提高追踪数据的可读性和兼容性
小结
这篇文章我们学习了 AgentScope 中 Plan 模块的实现,Plan 模块为 Agent 提供了任务规划能力,以支持复杂任务的执行。而 Trace 模块则为 AgentScope 本身提供了可观测能力,对于现代应用程序而言,可观测性是其不可或缺的一部分。通过 Trace 模块,我们可以追踪 AgentScope 的运行情况,从而更好地理解其内部行为和性能瓶颈。
Reference