在前两篇文章中,我们深入剖析了 AgentScope 的工具系统,这相当于为 Agent 构建了强健的 手脚,使其具备了影响物理世界的能力。然而,要让这些肢体协调运作,还需一个智慧的 大脑 来坐镇指挥。从本篇开始,我们将重点探讨 AgentScope 如何构建一套通用的适配层,以支持来自不同 Provider(供应商)的各类大模型。
总体设计
AgentScope 支持多 LLM 的核心架构由以下四个模块组成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ┌─────────────────────────────────────────────────────────────────┐ │ Agent Layer │ │ (ReActAgent, A2AAgent, etc.) │ └─────────────────────────────────────────────────────────────────┘ │ ┌───────────┼───────────┐ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ Model │ │ Formatter │ │ Token │ │ Module │ │ Module │ │ Module │ └───────────┘ └───────────┘ └───────────┘ │ │ │ └───────────┼───────────┘ ▼ ┌───────────────────────────────────────┐ │ Message Module │ │ (统一的消息抽象层) │ └───────────────────────────────────────┘
Message 模块 :提供统一的消息抽象
Model 模块 :专注于 API 调用和响应解析
Token 模块 :处理令牌计数
Formatter 模块 :负责消息格式转换,支持按照 Token 限制截断消息的能力
接下来我们将详细解释每个模块的实现原理,以及这些模块之间如何协同工作。
Message 模块
Message 模块的代码主要位于 src/agentscope/message/ 目录下,它是 AgentScope 框架的核心数据类型之一,它让 AgentScope 能够以统一的方式来处理与不同 LLM 的交互。
1 2 3 4 src/agentscope/message/ ├── __init__.py ├── _message_base.py └── _message_block.py
Msg 类
Msg 类是 消息 的核心抽象,它主要包含以下属性:
1 2 3 4 5 6 7 8 class Msg : name: str content: str | Sequence [ContentBlock] role: Literal ["user" , "assistant" , "system" ] metadata: dict id : str timestamp: str invocation_id: str
Msg 类提供了以下方法,用于方便地构建和操作消息:
方法
功能
to_dict()
序列化为字典
from_dict(json_data)
从字典反序列化
get_content_blocks(block_type)
按类型提取内容块
get_text_content()
提取纯文本内容
has_content_blocks(block_type)
检查是否包含特定类型内容块
Msg 类的实现比较简单,稍微复杂一点的是其 get_content_blocks() 方法,使用 @overload 装饰器为 get_content_blocks() 方法提供了多种类型签名,使其能够灵活地从消息中提取不同种类的内容块。
ContentBlock 类
Msg 类使用 ContentBlock 类型来表示不同类型的内容块,而 Msg 的 content 则是一个 ContentBlock 列表(对于 str 类型的内容,则会转换为一个 TextBlock)。因此 ContentBlock 才是 Msg 中真正承载不同类型内容的载体。
ContentBlock 类型本身是一个 Union 类型,它是以下几种类型之一:
1 2 3 4 5 6 7 8 9 ContentBlock = ( ToolUseBlock | ToolResultBlock | TextBlock | ThinkingBlock | ImageBlock | AudioBlock | VideoBlock )
每种类型的含义如下所示:
类型
用途
关键字段
TextBlock
文本内容
type="text", text
ThinkingBlock
思考过程
type="thinking", thinking
ImageBlock
图像内容
type="image", source
AudioBlock
音频内容
type="audio", source
VideoBlock
视频内容
type="video", source
ToolUseBlock
工具调用请求
type="tool_use", id, name, input
ToolResultBlock
工具调用结果
type="tool_result", id, output, name
每种具体的类型都使用 TypedDict 来定义结构化的内容块类型,以 TextBlock 为例:
1 2 3 4 5 6 7 class TextBlock (TypedDict, total=False ): """The text block.""" type : Required[Literal ["text" ]] """The type of the block""" text: str """The text content"""
对于 ImageBlock、AudioBlock、VideoBlock 类型,其中的多模态数据既可以通过 URL 表示,也可以通过 Base64 字符串表示:
1 2 3 4 5 6 7 8 class ImageBlock (TypedDict, total=False ): """The image block""" type : Required[Literal ["image" ]] """The type of the block""" source: Required[Base64Source | URLSource] """The src of the image"""
因此,_message_block.py 中定义了两种 Source 类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Base64Source (TypedDict, total=False ): """The base64 source""" type : Required[Literal ["base64" ]] """The type of the src, must be `base64`""" media_type: Required[str ] """The media type of the data, e.g. `image/jpeg` or `audio/mpeg`""" data: Required[str ] """The base64 data, in format of RFC 2397""" class URLSource (TypedDict, total=False ): """The URL source""" type : Required[Literal ["url" ]] """The type of the src, must be `url`""" url: Required[str ] """The URL of the image or audio"""
以上就完成了对 AgentScope 的 Message 模块的剖析,只要理解其两个核心数据结构 Msg 和 ContentBlock,就能够理解 AgentScope 如何表示和操作消息了。
Model 模块
Model 模块负责与各种 LLM API 交互,并将响应统一为 ChatResponse 格式。 Model 模块的代码位于 src/agentscope/model 目录下,负责:
封装各 LLM API 的调用逻辑
统一响应格式为 ChatResponse
支持流式和非流式输出
处理工具调用(Function Calling)
支持结构化输出(Structured Output)
其目录结构如下所示:
1 2 3 4 5 6 7 8 9 10 11 src/agentscope/model/ ├── __init__.py ├── _model_base.py ├── _model_response.py ├── _model_usage.py ├── _openai_model.py ├── _anthropic_model.py ├── _dashscope_model.py ├── _gemini_model.py ├── _ollama_model.py └── _trinity_model.py
ChatModelBase 抽象基类
ChatModelBase 是所有 LLM 的抽象基类,定义了与 LLM 交互的基本接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class ChatModelBase : model_name: str stream: bool def __init__ (self, model_name: str , stream: bool ) -> None : self.model_name = model_name self.stream = stream @abstractmethod async def __call__ ( self, messages: list [dict ], tools: list [dict ] | None = None , tool_choice: Literal ["auto" , "none" , "required" ] | str | None = None , structured_model: Type [BaseModel] | None = None , **kwargs: Any , ) -> ChatResponse | AsyncGenerator[ChatResponse, None ]: pass def _validate_tool_choice (self, tool_choice: str , tools: list [dict ] | None ): """验证 tool_choice 参数有效性""" ...
通过实现 __call__ 方法,ChatModelBase 的实例是可调用对象
为所有的模型调用都提供一致的接口,包括:
参数:messages、tools、tool_choice、structured_model、**kwargs(用于传递额外的参数)
返回值统一为 ChatResponse 或 ChatResponse 的异步生成器
ChatResponse 用来表示 LLM 的响应:
1 2 3 4 5 6 7 8 @dataclass class ChatResponse (DictMixin ): content: Sequence [TextBlock | ToolUseBlock | ThinkingBlock | AudioBlock] id : str = field(default_factory=lambda : _get_timestamp(True )) created_at: str = field(default_factory=_get_timestamp) type : Literal ["chat" ] = field(default_factory=lambda : "chat" ) usage: ChatUsage | None = field(default_factory=lambda : None ) metadata: dict [str , JSONSerializableObject] | None = field(default_factory=lambda : None )
其核心字段如下:
字段
类型
说明
content
Sequence[ContentBlock]
内容块序列,支持文本、工具调用、思考过程、音频
usage
ChatUsage | None
Token 使用统计
metadata
dict | None
结构化输出结果
content 是 ContentBlock 的序列,用来表示实际的 LLM 输出内容,而 usage 则用来存储 Token 使用信息,其类型为 ChatUsage:
1 2 3 4 5 6 7 @dataclass class ChatUsage(DictMixin): input_tokens: int # 输入 Token 数 output_tokens: int # 输出 Token 数 time: float # 响应耗时(秒) type: Literal["chat"] = "chat" metadata: dict[str, Any] | None = None # 原始 API 返回的信息
OpenAIChatModel
AgentScope 框架支持了多种 provider 的 LLM,这里我们以 OpenAIChatModel 为例来介绍如何封装 OpenAI 的 API 调用,其他 provider 实现也是类似的,只是少量 API 调用细节有所不同。
初始化,创建 OpenAI 异步客户端,支持 OpenAI 和 Azure 两种客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class OpenAIChatModel (ChatModelBase ): def __init__ ( self, model_name: str , api_key: str | None = None , stream: bool = True , reasoning_effort: Literal ["low" , "medium" , "high" ] | None = None , organization: str = None , stream_tool_parsing: bool = True , client_type: Literal ["openai" , "azure" ] = "openai" , client_kwargs: dict [str , JSONSerializableObject] | None = None , generate_kwargs: dict [str , JSONSerializableObject] | None = None , **kwargs: Any , ) -> None : import openai if client_type == "azure" : self.client = openai.AsyncAzureOpenAI( api_key=api_key, organization=organization,** (client_kwargs or {}) ) else : self.client = openai.AsyncClient( api_key=api_key, organization=organization, **(client_kwargs or {}) )
__call__ 方法,调用 OpenAI 的 create_chat_completion API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 async def __call__ ( self, messages: list [dict ], tools: list [dict ] | None = None , tool_choice: Literal ["auto" , "none" , "required" ] | str | None = None , structured_model: Type [BaseModel] | None = None , **kwargs: Any , ) -> ChatResponse | AsyncGenerator[ChatResponse, None ]: if not isinstance (messages, list ): raise ValueError("messages 必须是列表" ) if not all ("role" in msg and "content" in msg for msg in messages): raise ValueError("每个消息必须包含 role 和 content" ) if "omni" in self.model_name.lower(): _format_audio_data_for_qwen_omni(messages) kwargs = { "model" : self.model_name, "messages" : messages, "stream" : self.stream, **self.generate_kwargs, **kwargs, } if self.reasoning_effort and "reasoning_effort" not in kwargs: kwargs["reasoning_effort" ] = self.reasoning_effort if tools: kwargs["tools" ] = self._format_tools_json_schemas(tools) if tool_choice: if tool_choice == "any" : warnings.warn("any 已废弃,改用 required" ) tool_choice = "required" self._validate_tool_choice(tool_choice, tools) kwargs["tool_choice" ] = self._format_tool_choice(tool_choice) if self.stream: kwargs["stream_options" ] = {"include_usage" : True } start_datetime = datetime.now() if structured_model: if tools or tool_choice: logger.warning("structured_model 已指定,tools/tool_choice 会被忽略" ) kwargs.pop("stream" , None ) kwargs.pop("tools" , None ) kwargs.pop("tool_choice" , None ) kwargs["response_format" ] = structured_model if not self.stream: response = await self.client.chat.completions.parse(**kwargs) else : response = self.client.chat.completions.stream(**kwargs) return self._parse_openai_stream_response(start_datetime, response, structured_model) else : response = await self.client.chat.completions.create(**kwargs) if self.stream: return self._parse_openai_stream_response(start_datetime, response, structured_model) else : parsed_response = self._parse_openai_completion_response(start_datetime, response, structured_model) return parsed_response
需要注意,对于结构化输出 ,将会忽略工具参数,强制使用指定的 Pydantic 模型格式
代码中比较复杂的逻辑就是解析流式响应,即 _parse_openai_stream_response。而对于非流式响应则直接调用 _parse_openai_completion_response 方法进行解析即可
为支持解析不完整的 json 输出,使用了 json_repair 库来解析 LLM 的输出
接下来我们稍微解释一下流式解析的核心逻辑,即 _parse_openai_stream_response 的实现原理。首先需要注意,针对是否流式、是否包含 structured_model,OpenAIChatModel 会调用不同的 OpenAI SDK 接口:
1 2 3 4 5 6 7 8 response = await self.client.chat.completions.parse(**kwargs) response = self.client.chat.completions.stream(**kwargs) await self.client.chat.completions.create(**kwargs)
我们首先通过一个例子理解 client.chat.completions.create 返回结果的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import osimport asynciofrom openai import AsyncOpenAIasync def main (): client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY" ]) stream = await client.chat.completions.create( model="gpt-5.1" , messages=[{"role" : "user" , "content" : "说一句话" }], stream=True , stream_options={"include_usage" : True }, ) async for chunk in stream: print (chunk) print ("---" ) if __name__ == "__main__" : asyncio.run(main())
1 2 3 4 5 6 ChatCompletionChunk(id ='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ' , choices=[Choice(delta=ChoiceDelta(content='' , function_call=None , refusal=None , role='assistant' , tool_calls=None ), finish_reason=None , index=0 , logprobs=None )], created=1774265270 , model='gpt-5.1-2025-11-13' , object ='chat.completion.chunk' , service_tier='default' , system_fingerprint=None , usage=None , obfuscation='5Dd89wBE' ) --- ChatCompletionChunk(id ='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ' , choices=[Choice(delta=ChoiceDelta(content='你好' , function_call=None , refusal=None , role=None , tool_calls=None ), finish_reason=None , index=0 , logprobs=None )], created=1774265270 , model='gpt-5.1-2025-11-13' , object ='chat.completion.chunk' , service_tier='default' , system_fingerprint=None , usage=None , obfuscation='5bRg7noG' ) --- ChatCompletionChunk(id ='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ' , choices=[Choice(delta=ChoiceDelta(content=',' , function_call=None , refusal=None , role=None , tool_calls=None ), finish_reason=None , index=0 , logprobs=None )], created=1774265270 , model='gpt-5.1-2025-11-13' , object ='chat.completion.chunk' , service_tier='default' , system_fingerprint=None , usage=None , obfuscation='xv3bnd5jF' ) ---
返回的是 ChatCompletionChunk 对象,其通过 delta 信息来返回本次新返回的内容,新返回的内容可以是 content、function_call 等信息,而 _parse_openai_stream_response 核心逻辑就是解析这些 delta 信息,附加到直接已累计的 text、thinking、tool_calls 等,从而构成对应的 TextBlock、ThinkingBlock 等对象,并通过 ChatResponse 对象进行返回。因此流式解析,每次看到的 Block 对象的内容,其实是包含截止目前为止的所有累计内容 。
一个例子
接下来通过一个例子来看下应该如何使用 structured_model 来获取结构化输出信息,以及 ChatResponse 到底包含哪些内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import osimport asynciofrom pydantic import BaseModel, Fieldfrom agentscope.model import OpenAIChatModelclass PersonInfo (BaseModel ): name: str = Field(description="姓名" ) age: int = Field(description="年龄" ) occupation: str = Field(description="职业" ) async def main (): model = OpenAIChatModel( model_name="qwen3-max" , api_key=os.environ["OPENAI_API_KEY" ], stream=False , ) response = await model( messages=[{"role" : "user" , "content" : "介绍一下马斯克" }], structured_model=PersonInfo, ) print (response) if __name__ == "__main__" : asyncio.run(main())
1 2 ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资者" \n}' }], id ='2026-03-23 17:55:43.740_cc1aa7' , created_at='2026-03-23 17:55:43.740' , type ='chat' , usage=ChatUsage(input_tokens=14 , output_tokens=42 , time=2.457625 , type ='chat' , metadata=CompletionUsage(completion_tokens=42 , prompt_tokens=14 , total_tokens=56 , completion_tokens_details=None , prompt_tokens_details=PromptTokensDetails(audio_tokens=None , cached_tokens=0 ))), metadata={'name' : '埃隆·马斯克' , 'age' : 53 , 'occupation' : '企业家、工程师、发明家、投资者' })
流式调用,可以看到,每次输出的 ChatResponse 对象包含的是截止目前为止所有累计的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 async def main (): model = OpenAIChatModel( model_name="qwen3-max" , api_key=os.environ["OPENAI_API_KEY" ], stream=True , ) response = await model( messages=[{"role" : "user" , "content" : "介绍一下马斯克" }], structured_model=PersonInfo, ) async for chunk in response: print (chunk)
1 2 3 4 5 6 7 8 9 10 ChatResponse(content=[{'type' : 'text' , 'text' : '{' }], id ='2026-03-23 19:41:48.955_a95d91' , created_at='2026-03-23 19:41:48.955' , type ='chat' , usage=None , metadata={}) ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n' }], id ='2026-03-23 19:41:49.013_57efd1' , created_at='2026-03-23 19:41:49.013' , type ='chat' , usage=None , metadata={}) ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n "a' }], id ='2026-03-23 19:41:49.182_a60c08' , created_at='2026-03-23 19:41:49.182' , type ='chat' , usage=None , metadata={}) ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n "age": 5' }], id ='2026-03-23 19:41:49.331_201244' , created_at='2026-03-23 19:41:49.331' , type ='chat' , usage=None , metadata={'age' : 5 }) ...... ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资人" \n}' }], id ='2026-03-23 19:41:50.138_9b416d' , created_at='2026-03-23 19:41:50.138' , type ='chat' , usage=None , metadata={'age' : 53 , 'name' : '埃隆·马斯克' , 'occupation' : '企业家、工程师、发明家、投资人' }) ChatResponse(content=[{'type' : 'text' , 'text' : '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资人" \n}' }], id ='2026-03-23 19:41:50.141_554521' , created_at='2026-03-23 19:41:50.141' , type ='chat' , usage=ChatUsage(input_tokens=14 , output_tokens=42 , time=2.206184 , type ='chat' , metadata=CompletionUsage(completion_tokens=42 , prompt_tokens=14 , total_tokens=56 , completion_tokens_details=None , prompt_tokens_details=PromptTokensDetails(audio_tokens=None , cached_tokens=0 ))), metadata={'age' : 53 , 'name' : '埃隆·马斯克' , 'occupation' : '企业家、工程师、发明家、投资人' })
以上我们就概要介绍了 AgentScope 的 LLM model 模块实现原理,其为各种 provider 的 LLM 提供了统一的抽象接口,即 ChatModelBase、ChatResponse 等。
Token 模块
Token 模块负责 Token 计数功能,用于支持消息截断,它与 Formatter 模块配合,确保消息在 Token 限制范围内。Token 模块代码位于 src/agentscope/token 目录下,文件列表如下:
1 2 3 4 5 6 7 8 src/agentscope/token/ ├── __init__.py ├── _token_base.py ├── _openai_token_counter.py ├── _anthropic_token_counter.py ├── _gemini_token_counter.py ├── _huggingface_token_counter.py └── _char_token_counter.py
_token_base.py 定义了抽象基类 TokenCounterBase,代码如下:
1 2 3 4 5 6 7 8 9 10 class TokenCounterBase : """Token 计数基类""" @abstractmethod async def count ( self, messages: list [dict ], **kwargs: Any , ) -> int : """计算消息的 Token 数量"""
TokenCounterBase 核心就是定义了一个抽象方法 count,用于计算消息的 Token 数量。各个具体的模型 Token 计数器则继承自 TokenCounterBase,按照各自的规则实现自己的 Token 计数。
CharTokenCounter 实现了简单的基于字符的计数
HuggingFaceTokenCounter 基于 transformers.AutoTokenizer 实现 Token 计算
AnthropicTokenCounter、GeminiTokenCounter 都是直接调用各自的 API 来计算 Token 数量
OpenAITokenCounter 则是通过 tiktoken 实现 Token 计算,计算逻辑相对复杂
AgentScope 内部通过 Msg 类型来统一表述 LLM 消息,而 Model 模块则提供了访问 LLM 的统一接口,而为了将 Msg 类型能够转换为具体 LLM 所要求的格式,AgentScope 又实现了 Formatter 消息转换层,主要负责将 AgentScope 内部统一的 Msg 消息对象转换为各个 LLM API 所需的特定格式,这样各个 Model 的 __call__ 接口在处理 messages 等参数时,就可以认为这些参数已经是自己所需要的格式了,不需要再进行额外的格式转换了 。
1 2 3 4 5 6 7 8 9 10 11 12 class OpenAIChatModel (ChatModelBase ): async def __call__ ( self, messages: list [dict ], tools: list [dict ] | None = None , tool_choice: Literal ["auto" , "none" , "required" ] | str | None = None , structured_model: Type [BaseModel] | None = None , **kwargs: Any , ) -> ChatResponse | AsyncGenerator[ChatResponse, None ]:
因此,Formatter 模块是 AgentScope 支持多种 LLM 的关键组件之一,其代码位于 src/agentscope/formatter 目录下,文件列表如下:
1 2 3 4 5 6 7 8 9 10 11 src/agentscope/formatter/ ├── __init__.py ├── _formatter_base.py ├── _truncated_formatter_base.py ├── _openai_formatter.py ├── _anthropic_formatter.py ├── _gemini_formatter.py ├── _dashscope_formatter.py ├── _ollama_formatter.py ├── _deepseek_formatter.py └── _a2a_formatter.py
FormattterBase 抽象基类定义了所有 Foramtter 的公共接口,其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 class FormatterBase : @abstractmethod async def format (self, *args, **kwargs ) -> list [dict [str , Any ]]: """将 Msg 对象格式化为 API 所需的字典列表""" @staticmethod def assert_list_of_msgs (msgs: list [Msg] ) -> None : """验证输入是否为 Msg 对象列表""" @staticmethod def convert_tool_result_to_string (output ) -> tuple [str , list ]: """将工具结果转换为文本(兼容不支持多模态的 API)"""
TruncatedFormatterBase 则实现了基于 Token 限制进行消息截断的功能,它需要接受一个 TokenCounterBase 对象来计算当前消息的 Token 数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class TruncatedFormatterBase (FormatterBase, ABC): def __init__ ( self, token_counter: TokenCounterBase | None = None , max_tokens: int | None = None , ): self.token_counter = token_counter self.max_tokens = max_tokens @trace_format async def format (self, msgs: list [Msg] ) -> list [dict [str , Any ]]: while True : formatted_msgs = await self._format (msgs) n_tokens = await self._count(formatted_msgs) if n_tokens is None or self.max_tokens is None or n_tokens <= self.max_tokens: return formatted_msgs msgs = await self._truncate(msgs)
格式化流程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 ┌─────────────────────────────────────────────────────────────┐ │ format(msgs) │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────────────────────┐ │ 1. assert_list_of_msgs │ │ 2. deepcopy(msgs) │ └───────────────────────────────┘ │ ▼ ┌───────────────────────────────┐ │ 3. _format(msgs) │ │ - _format_system_message │ │ - _group_messages │ │ - _format_tool_sequence │ │ - _format_agent_message │ └───────────────────────────────┘ │ ▼ ┌───────────────────────────────┐ │ 4. _count(formatted_msgs) │ └───────────────────────────────┘ │ ┌───────────────┴───────────────┐ ▼ ▼ n_tokens <= max_tokens n_tokens > max_tokens │ │ ▼ ▼ return _truncate(msgs) → 循环
_group_messages 负责将消息分成两种类型:
tool_sequence:工具调用序列,包含 tool_use 或 tool_result Block
agent_message:不包含上述 Block,是普通的 Agent 消息
之所以要进行分组,是因为多 Agent 场景下,不同类型的消息需要不同的格式化策略:
tool_sequence:保持工具调用的完整性,按 API 要求格式化
agent_message:可以合并为历史记录,减少消息数量
_format_tool_sequence 和 _format_agent_message 都是抽象方法,需要具体的子类实现,以提供实际的消息格式化逻辑
_truncate() 负责对格式化后的消息进行截断,它的实现逻辑:
保留 system 消息
保证 tool_use 和 tool_result 总是成对出现,即要么整体保留,要么整体删除
从最早的消息开始删除
接下来我们再来看具体的、对消息内容真正进行格式化的子类。这里以适配 OpenAI 模型的 OpenAIChatFormatter 和 OpenAIMultiAgentFormatter 为例。在 AgentScope 中,提供了两种类型的格式化器:ChatFormatter 和 MultiAgentFormatter:
ChatFormatter:专为标准的用户-助手场景的聊天机器人(单 Agent 对话)设计,可以通过 role 来区分用户和助手
MultiAgentFormatter:专为多智能体场景设计,此时可以额外通过 name 来识别不同的实体
先来看 OpenAIChatFormatter 的核心逻辑实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class OpenAIChatFormatter (TruncatedFormatterBase ): async def _format (self, msgs: list [Msg] ) -> list [dict [str , Any ]]: messages = [] for msg in msgs: content_blocks = [] tool_calls = [] for block in msg.get_content_blocks(): if block["type" ] == "text" : content_blocks.append(block) elif block["type" ] == "tool_use" : tool_calls.append({ "id" : block["id" ], "type" : "function" , "function" : { "name" : block["name" ], "arguments" : json.dumps(block["input" ]), }, }) elif block["type" ] == "tool_result" : messages.append({ "role" : "tool" , "tool_call_id" : block["id" ], "content" : textual_output, }) elif block["type" ] == "image" : content_blocks.append({ "type" : "image_url" , "image_url" : {"url" : ...}, }) elif block["type" ] == "audio" : content_blocks.append({ "type" : "input_audio" , "input_audio" : {...}, }) messages.append({ "role" : msg.role, "name" : msg.name, "content" : content_blocks or None , "tool_calls" : tool_calls or None , })
OpenAIChatFormatter 虽然继承自 TruncatedFormatterBase,但它重新实现了 _format 方法,因此不需要实现 _format_tool_sequence 和 _format_agent_message
OpenAIChatFormatter 将 Msg 列表中的每条消息都转换为 OpenAI 所需的格式,因此其对话历史就是用户和助手的消息
再来看 OpenAIMultiAgentFormatter 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class OpenAIMultiAgentFormatter (TruncatedFormatterBase ): async def _format_tool_sequence ( self, msgs: list [Msg], ) -> list [dict [str , Any ]]: """Given a sequence of tool call/result messages, format them into the required format for the OpenAI API.""" return await OpenAIChatFormatter( promote_tool_result_images=self.promote_tool_result_images, ).format (msgs) async def _format_agent_message ( self, msgs: list [Msg], is_first: bool = True , ) -> list [dict [str , Any ]]: accumulated_text = [] for msg in msgs: for block in msg.get_content_blocks(): if block["type" ] == "text" : accumulated_text.append(f"{msg.name} : {block['text' ]} " ) conversation_blocks_text = conversation_history_prompt + "<history>\n" + "\n" .join(accumulated_text) + "\n</history>" content_list: list [dict [str , Any ]] = [] if conversation_blocks_text: content_list.append( { "type" : "text" , "text" : conversation_blocks_text, }, ) if images: content_list.extend(images) if audios: content_list.extend(audios) user_message = { "role" : "user" , "content" : content_list, } if content_list: formatted_msgs.append(user_message)
OpenAIMultiAgentFormatter 同样继承自 TruncatedFormatterBase,但它直接复用父类的 _format 方法,因此需要提供 _format_tool_sequence 和 _format_agent_message 方法的实现
_format_tool_sequence 直接使用 OpenAIChatFormatter 来格式化工具调用序列
_format_agent_message 则是将多 agent 的文本消息合并为一个 <history> 标签包裹的消息
值得说明的是,因为 OpenAI API 本身支持 name 字段,因此 OpenAIFormatter 也可以用于多智能体。而如果使用 OpenAIMultiAgentFormatter,则会将对话历史合并为单个用户消息。
另外,两种 Formatter 都支持 promote_tool_result_images 参数来控制是否将工具调用结果中的图片数据提升为文本消息说明:
1 2 3 4 5 6 7 8 9 10 11 if self.promote_tool_result_images and multimodal_block["type" ] == "image" : msgs.insert(i + 1 , Msg( name="user" , content=[ TextBlock(text="<system-info>The following are images..." ), ImageBlock(source=...), TextBlock(text="</system-info>" ), ], role="user" , ))
一个示例
最后,我们通过两个极简的例子,分别来看下这两种格式化器最后生成的内容是怎样的,以更好理解上述实现逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport jsonfrom agentscope.message import Msgfrom agentscope.formatter import OpenAIChatFormatterasync def main (): formatter = OpenAIChatFormatter() msgs = [ Msg(name="user" , content="你好" , role="user" ), Msg(name="assistant" , content="你好!有什么可以帮助你的?" , role="assistant" ), Msg(name="user" , content="介绍一下自己" , role="user" ), ] formatted = await formatter.format (msgs) print ("=== OpenAIChatFormatter 输出 ===" ) print (json.dumps(formatted, ensure_ascii=False , indent=2 )) if __name__ == "__main__" : asyncio.run(main())
输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 [ { "role" : "user" , "name" : "user" , "content" : [ { "type" : "text" , "text" : "你好" } ] }, { "role" : "assistant" , "name" : "assistant" , "content" : [ { "type" : "text" , "text" : "你好!有什么可以帮助你的?" } ] }, { "role" : "user" , "name" : "user" , "content" : [ { "type" : "text" , "text" : "介绍一下自己" } ] } ]
OpenAIMultiAgentFormatter 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport jsonfrom agentscope.message import Msgfrom agentscope.formatter import OpenAIMultiAgentFormatterasync def main (): formatter = OpenAIMultiAgentFormatter() msgs = [ Msg(name="Alice" , content="你好,我是 Alice" , role="user" ), Msg(name="Bob" , content="你好 Alice,我是 Bob" , role="assistant" ), Msg(name="Alice" , content="很高兴认识你" , role="user" ), ] formatted = await formatter.format (msgs) print ("=== OpenAIMultiAgentFormatter 输出 ===" ) print (json.dumps(formatted, ensure_ascii=False , indent=2 )) if __name__ == "__main__" : asyncio.run(main())
输出结果:
1 2 3 4 5 6 7 8 9 10 11 [ { "role" : "user" , "content" : [ { "type" : "text" , "text" : "# Conversation History\nThe content between <history></history> tags contains your conversation history\n<history>\nAlice: 你好,我是 Alice\nBob: 你好 Alice,我是 Bob\nAlice: 很高兴认识你\n</history>" } ] } ]
小结
本篇文章通过分析 AgentScope 里 Message、Model、Token、Formatter 模块的源码实现,深入探讨了该框架兼容多 Provider、多模态大模型(LM)的底层原理。