0%

agentscope 源码分析(3):模型交互

在前两篇文章中,我们深入剖析了 AgentScope 的工具系统,这相当于为 Agent 构建了强健的 手脚,使其具备了影响物理世界的能力。然而,要让这些肢体协调运作,还需一个智慧的 大脑 来坐镇指挥。从本篇开始,我们将重点探讨 AgentScope 如何构建一套通用的适配层,以支持来自不同 Provider(供应商)的各类大模型。

总体设计

AgentScope 支持多 LLM 的核心架构由以下四个模块组成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────────────────────────────┐
│ Agent Layer │
│ (ReActAgent, A2AAgent, etc.) │
└─────────────────────────────────────────────────────────────────┘

┌───────────┼───────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Model │ │ Formatter │ │ Token │
│ Module │ │ Module │ │ Module │
└───────────┘ └───────────┘ └───────────┘
│ │ │
└───────────┼───────────┘

┌───────────────────────────────────────┐
│ Message Module │
│ (统一的消息抽象层) │
└───────────────────────────────────────┘
  • Message 模块:提供统一的消息抽象
  • Model 模块:专注于 API 调用和响应解析
  • Token 模块:处理令牌计数
  • Formatter 模块:负责消息格式转换,支持按照 Token 限制截断消息的能力

接下来我们将详细解释每个模块的实现原理,以及这些模块之间如何协同工作。

Message 模块

Message 模块的代码主要位于 src/agentscope/message/ 目录下,它是 AgentScope 框架的核心数据类型之一,它让 AgentScope 能够以统一的方式来处理与不同 LLM 的交互。

1
2
3
4
src/agentscope/message/
├── __init__.py # 模块导出接口
├── _message_base.py # Msg 消息类定义
└── _message_block.py # 内容块类型定义

Msg 类

Msg 类是 消息 的核心抽象,它主要包含以下属性:

1
2
3
4
5
6
7
8
class Msg:
name: str # 发送者名称
content: str | Sequence[ContentBlock] # 消息内容
role: Literal["user", "assistant", "system"] # 角色
metadata: dict # 元数据(如结构化输出)
id: str # 唯一标识(shortuuid)
timestamp: str # 创建时间戳
invocation_id: str # API 调用追踪 ID

Msg 类提供了以下方法,用于方便地构建和操作消息:

方法 功能
to_dict() 序列化为字典
from_dict(json_data) 从字典反序列化
get_content_blocks(block_type) 按类型提取内容块
get_text_content() 提取纯文本内容
has_content_blocks(block_type) 检查是否包含特定类型内容块

Msg 类的实现比较简单,稍微复杂一点的是其 get_content_blocks() 方法,使用 @overload 装饰器为 get_content_blocks() 方法提供了多种类型签名,使其能够灵活地从消息中提取不同种类的内容块。

ContentBlock 类

Msg 类使用 ContentBlock 类型来表示不同类型的内容块,而 Msg 的 content 则是一个 ContentBlock 列表(对于 str 类型的内容,则会转换为一个 TextBlock)。因此 ContentBlock 才是 Msg 中真正承载不同类型内容的载体。

ContentBlock 类型本身是一个 Union 类型,它是以下几种类型之一:

1
2
3
4
5
6
7
8
9
ContentBlock = (
ToolUseBlock
| ToolResultBlock
| TextBlock
| ThinkingBlock
| ImageBlock
| AudioBlock
| VideoBlock
)

每种类型的含义如下所示:

类型 用途 关键字段
TextBlock 文本内容 type="text", text
ThinkingBlock 思考过程 type="thinking", thinking
ImageBlock 图像内容 type="image", source
AudioBlock 音频内容 type="audio", source
VideoBlock 视频内容 type="video", source
ToolUseBlock 工具调用请求 type="tool_use", id, name, input
ToolResultBlock 工具调用结果 type="tool_result", id, output, name

每种具体的类型都使用 TypedDict 来定义结构化的内容块类型,以 TextBlock 为例:

1
2
3
4
5
6
7
class TextBlock(TypedDict, total=False):
"""The text block."""

type: Required[Literal["text"]]
"""The type of the block"""
text: str
"""The text content"""

对于 ImageBlockAudioBlockVideoBlock 类型,其中的多模态数据既可以通过 URL 表示,也可以通过 Base64 字符串表示:

1
2
3
4
5
6
7
8
class ImageBlock(TypedDict, total=False):
"""The image block"""

type: Required[Literal["image"]]
"""The type of the block"""

source: Required[Base64Source | URLSource]
"""The src of the image"""

因此,_message_block.py 中定义了两种 Source 类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Base64Source(TypedDict, total=False):
"""The base64 source"""

type: Required[Literal["base64"]]
"""The type of the src, must be `base64`"""

media_type: Required[str]
"""The media type of the data, e.g. `image/jpeg` or `audio/mpeg`"""

data: Required[str]
"""The base64 data, in format of RFC 2397"""

class URLSource(TypedDict, total=False):
"""The URL source"""

type: Required[Literal["url"]]
"""The type of the src, must be `url`"""

url: Required[str]
"""The URL of the image or audio"""

以上就完成了对 AgentScope 的 Message 模块的剖析,只要理解其两个核心数据结构 MsgContentBlock,就能够理解 AgentScope 如何表示和操作消息了。

Model 模块

Model 模块负责与各种 LLM API 交互,并将响应统一为 ChatResponse 格式。 Model 模块的代码位于 src/agentscope/model 目录下,负责:

  • 封装各 LLM API 的调用逻辑
  • 统一响应格式为 ChatResponse
  • 支持流式和非流式输出
  • 处理工具调用(Function Calling)
  • 支持结构化输出(Structured Output)

其目录结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
src/agentscope/model/
├── __init__.py # 模块导出
├── _model_base.py # 抽象基类 ChatModelBase
├── _model_response.py # 响应数据类 ChatResponse
├── _model_usage.py # Token 使用统计 ChatUsage
├── _openai_model.py # OpenAI/Azure 模型实现
├── _anthropic_model.py # Anthropic Claude 模型实现
├── _dashscope_model.py # 阿里云 DashScope 模型实现
├── _gemini_model.py # Google Gemini 模型实现
├── _ollama_model.py # Ollama 本地模型实现
└── _trinity_model.py # Trinity-RFT 训练模型(已弃用)

ChatModelBase 抽象基类

ChatModelBase 是所有 LLM 的抽象基类,定义了与 LLM 交互的基本接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ChatModelBase:
model_name: str # 模型名称
stream: bool # 是否流式输出

def __init__(self, model_name: str, stream: bool) -> None:
self.model_name = model_name
self.stream = stream

@abstractmethod
async def __call__(
self,
messages: list[dict],
tools: list[dict] | None = None,
tool_choice: Literal["auto", "none", "required"] | str | None = None,
structured_model: Type[BaseModel] | None = None,
**kwargs: Any,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:
pass

def _validate_tool_choice(self, tool_choice: str, tools: list[dict] | None):
"""验证 tool_choice 参数有效性"""
...
  • 通过实现 __call__ 方法,ChatModelBase 的实例是可调用对象
  • 为所有的模型调用都提供一致的接口,包括:
    • 参数:messagestoolstool_choicestructured_model**kwargs(用于传递额外的参数)
    • 返回值统一为 ChatResponseChatResponse 的异步生成器

ChatResponse 用来表示 LLM 的响应:

1
2
3
4
5
6
7
8
@dataclass
class ChatResponse(DictMixin):
content: Sequence[TextBlock | ToolUseBlock | ThinkingBlock | AudioBlock]
id: str = field(default_factory=lambda: _get_timestamp(True))
created_at: str = field(default_factory=_get_timestamp)
type: Literal["chat"] = field(default_factory=lambda: "chat")
usage: ChatUsage | None = field(default_factory=lambda: None)
metadata: dict[str, JSONSerializableObject] | None = field(default_factory=lambda: None)

其核心字段如下:

字段 类型 说明
content Sequence[ContentBlock] 内容块序列,支持文本、工具调用、思考过程、音频
usage ChatUsage | None Token 使用统计
metadata dict | None 结构化输出结果

contentContentBlock 的序列,用来表示实际的 LLM 输出内容,而 usage 则用来存储 Token 使用信息,其类型为 ChatUsage

1
2
3
4
5
6
7
@dataclass
class ChatUsage(DictMixin):
input_tokens: int # 输入 Token 数
output_tokens: int # 输出 Token 数
time: float # 响应耗时(秒)
type: Literal["chat"] = "chat"
metadata: dict[str, Any] | None = None # 原始 API 返回的信息

OpenAIChatModel

AgentScope 框架支持了多种 provider 的 LLM,这里我们以 OpenAIChatModel 为例来介绍如何封装 OpenAI 的 API 调用,其他 provider 实现也是类似的,只是少量 API 调用细节有所不同。

  • 初始化,创建 OpenAI 异步客户端,支持 OpenAI 和 Azure 两种客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class OpenAIChatModel(ChatModelBase):
def __init__(
self,
model_name: str, # 模型名称,比如 "gpt-3.5-turbo"
api_key: str | None = None, # OpenAI API 密钥
stream: bool = True, # 是否开启流式响应
reasoning_effort: Literal["low", "medium", "high"] | None = None, # 推理强度(o3/o4模型用)
organization: str = None, # OpenAI 组织ID
stream_tool_parsing: bool = True, # 流式工具调用时是否实时解析JSON
client_type: Literal["openai", "azure"] = "openai", # 客户端类型(普通OpenAI/微软Azure)
client_kwargs: dict[str, JSONSerializableObject] | None = None, # 客户端额外参数
generate_kwargs: dict[str, JSONSerializableObject] | None = None, # 生成参数(temperature等)
**kwargs: Any,
) -> None:

# 导入并创建 OpenAI 异步客户端
import openai
if client_type == "azure":
self.client = openai.AsyncAzureOpenAI(
api_key=api_key, organization=organization,** (client_kwargs or {})
)
else:
self.client = openai.AsyncClient(
api_key=api_key, organization=organization, **(client_kwargs or {})
)
  • __call__ 方法,调用 OpenAI 的 create_chat_completion API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
async def __call__(
self,
messages: list[dict], # 对话消息列表([{"role": "user", "content": "你好"}, ...])
tools: list[dict] | None = None, # 工具定义(JSON Schema)
tool_choice: Literal["auto", "none", "required"] | str | None = None, # 工具调用策略
structured_model: Type[BaseModel] | None = None, # 结构化输出的模型
**kwargs: Any, # 额外生成参数(temperature、max_tokens等)
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:

# 1. 校验消息格式:必须是列表,且每个消息有 role 和 content
if not isinstance(messages, list):
raise ValueError("messages 必须是列表")
if not all("role" in msg and "content" in msg for msg in messages):
raise ValueError("每个消息必须包含 role 和 content")

# 2. 适配 Qwen-omni 音频格式
if "omni" in self.model_name.lower():
_format_audio_data_for_qwen_omni(messages)

# 3. 组装 API 调用参数
kwargs = {
"model": self.model_name,
"messages": messages,
"stream": self.stream,
**self.generate_kwargs, # 初始化时的生成参数
**kwargs, # 调用时传入的额外参数
}
if self.reasoning_effort and "reasoning_effort" not in kwargs:
kwargs["reasoning_effort"] = self.reasoning_effort

# 4. 处理工具调用参数
if tools:
kwargs["tools"] = self._format_tools_json_schemas(tools)

if tool_choice:
if tool_choice == "any": # 兼容旧值
warnings.warn("any 已废弃,改用 required")
tool_choice = "required"
self._validate_tool_choice(tool_choice, tools) # 校验工具选择是否合法
kwargs["tool_choice"] = self._format_tool_choice(tool_choice)

# 5. 流式响应额外配置
if self.stream:
kwargs["stream_options"] = {"include_usage": True}

start_datetime = datetime.now() # 记录开始时间(用于计算耗时)

# 6. 处理结构化输出
if structured_model:
# 如果指定了结构化模型,忽略 tools/tool_choice,强制结构化输出
if tools or tool_choice:
logger.warning("structured_model 已指定,tools/tool_choice 会被忽略")
kwargs.pop("stream", None)
kwargs.pop("tools", None)
kwargs.pop("tool_choice", None)
kwargs["response_format"] = structured_model # 设置响应格式为结构化模型

# 调用 OpenAI API(结构化输出的专属方法)
if not self.stream:
response = await self.client.chat.completions.parse(**kwargs)
else:
response = self.client.chat.completions.stream(**kwargs)
return self._parse_openai_stream_response(start_datetime, response, structured_model)
else:
# 普通调用(非结构化输出)
response = await self.client.chat.completions.create(**kwargs)

# 7. 处理返回结果
if self.stream:
return self._parse_openai_stream_response(start_datetime, response, structured_model)
else:
# 非流式:解析完整响应
parsed_response = self._parse_openai_completion_response(start_datetime, response, structured_model)
return parsed_response
  • 需要注意,对于结构化输出,将会忽略工具参数,强制使用指定的 Pydantic 模型格式

  • 代码中比较复杂的逻辑就是解析流式响应,即 _parse_openai_stream_response。而对于非流式响应则直接调用 _parse_openai_completion_response 方法进行解析即可

  • 为支持解析不完整的 json 输出,使用了 json_repair 库来解析 LLM 的输出

接下来我们稍微解释一下流式解析的核心逻辑,即 _parse_openai_stream_response 的实现原理。首先需要注意,针对是否流式、是否包含 structured_modelOpenAIChatModel 会调用不同的 OpenAI SDK 接口:

1
2
3
4
5
6
7
8
# 非流式,但指定了 structured_model
response = await self.client.chat.completions.parse(**kwargs)

# 流式,同时指定了 structured_model
response = self.client.chat.completions.stream(**kwargs)

# 没有指定 structured_model,流式非流式都是调用(通过参数区分是否流式)
await self.client.chat.completions.create(**kwargs)

我们首先通过一个例子理解 client.chat.completions.create 返回结果的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import asyncio
from openai import AsyncOpenAI


async def main():
client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])

stream = await client.chat.completions.create(
model="gpt-5.1",
messages=[{"role": "user", "content": "说一句话"}],
stream=True,
stream_options={"include_usage": True},
)

async for chunk in stream:
print(chunk)
print("---")


if __name__ == "__main__":
asyncio.run(main())

1
2
3
4
5
6
ChatCompletionChunk(id='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1774265270, model='gpt-5.1-2025-11-13', object='chat.completion.chunk', service_tier='default', system_fingerprint=None, usage=None, obfuscation='5Dd89wBE')
---
ChatCompletionChunk(id='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ', choices=[Choice(delta=ChoiceDelta(content='你好', function_call=None, refusal=None, role=None, tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1774265270, model='gpt-5.1-2025-11-13', object='chat.completion.chunk', service_tier='default', system_fingerprint=None, usage=None, obfuscation='5bRg7noG')
---
ChatCompletionChunk(id='chatcmpl-DMXjiiMAI6VXGTmTVPmagSTIyV3EZ', choices=[Choice(delta=ChoiceDelta(content=',', function_call=None, refusal=None, role=None, tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1774265270, model='gpt-5.1-2025-11-13', object='chat.completion.chunk', service_tier='default', system_fingerprint=None, usage=None, obfuscation='xv3bnd5jF')
---
  • 返回的是 ChatCompletionChunk 对象,其通过 delta 信息来返回本次新返回的内容,新返回的内容可以是 contentfunction_call 等信息,而 _parse_openai_stream_response 核心逻辑就是解析这些 delta 信息,附加到直接已累计的 textthinkingtool_calls 等,从而构成对应的 TextBlock、ThinkingBlock 等对象,并通过 ChatResponse 对象进行返回。因此流式解析,每次看到的 Block 对象的内容,其实是包含截止目前为止的所有累计内容

一个例子

接下来通过一个例子来看下应该如何使用 structured_model 来获取结构化输出信息,以及 ChatResponse 到底包含哪些内容:

  • 非流式调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import os
import asyncio
from pydantic import BaseModel, Field
from agentscope.model import OpenAIChatModel


class PersonInfo(BaseModel):
name: str = Field(description="姓名")
age: int = Field(description="年龄")
occupation: str = Field(description="职业")


async def main():
model = OpenAIChatModel(
model_name="qwen3-max",
api_key=os.environ["OPENAI_API_KEY"],
stream=False,
)

response = await model(
messages=[{"role": "user", "content": "介绍一下马斯克"}],
structured_model=PersonInfo,
)

print(response)

if __name__ == "__main__":
asyncio.run(main())
1
2
# python structured_output_demo.py
ChatResponse(content=[{'type': 'text', 'text': '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资者" \n}'}], id='2026-03-23 17:55:43.740_cc1aa7', created_at='2026-03-23 17:55:43.740', type='chat', usage=ChatUsage(input_tokens=14, output_tokens=42, time=2.457625, type='chat', metadata=CompletionUsage(completion_tokens=42, prompt_tokens=14, total_tokens=56, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetails(audio_tokens=None, cached_tokens=0))), metadata={'name': '埃隆·马斯克', 'age': 53, 'occupation': '企业家、工程师、发明家、投资者'})
  • 流式调用,可以看到,每次输出的 ChatResponse 对象包含的是截止目前为止所有累计的信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def main():
model = OpenAIChatModel(
model_name="qwen3-max",
api_key=os.environ["OPENAI_API_KEY"],
stream=True,
)

response = await model(
messages=[{"role": "user", "content": "介绍一下马斯克"}],
structured_model=PersonInfo,
)

async for chunk in response:
print(chunk)
1
2
3
4
5
6
7
8
9
10
# python structured_output_stream_demo.py
ChatResponse(content=[{'type': 'text', 'text': '{'}], id='2026-03-23 19:41:48.955_a95d91', created_at='2026-03-23 19:41:48.955', type='chat', usage=None, metadata={})
ChatResponse(content=[{'type': 'text', 'text': '{ \n'}], id='2026-03-23 19:41:49.013_57efd1', created_at='2026-03-23 19:41:49.013', type='chat', usage=None, metadata={})
ChatResponse(content=[{'type': 'text', 'text': '{ \n "a'}], id='2026-03-23 19:41:49.182_a60c08', created_at='2026-03-23 19:41:49.182', type='chat', usage=None, metadata={})
ChatResponse(content=[{'type': 'text', 'text': '{ \n "age": 5'}], id='2026-03-23 19:41:49.331_201244', created_at='2026-03-23 19:41:49.331', type='chat', usage=None, metadata={'age': 5})
......

ChatResponse(content=[{'type': 'text', 'text': '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资人" \n}'}], id='2026-03-23 19:41:50.138_9b416d', created_at='2026-03-23 19:41:50.138', type='chat', usage=None, metadata={'age': 53, 'name': '埃隆·马斯克', 'occupation': '企业家、工程师、发明家、投资人'})

ChatResponse(content=[{'type': 'text', 'text': '{ \n "age": 53, \n "name": "埃隆·马斯克", \n "occupation": "企业家、工程师、 发明家、投资人" \n}'}], id='2026-03-23 19:41:50.141_554521', created_at='2026-03-23 19:41:50.141', type='chat', usage=ChatUsage(input_tokens=14, output_tokens=42, time=2.206184, type='chat', metadata=CompletionUsage(completion_tokens=42, prompt_tokens=14, total_tokens=56, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetails(audio_tokens=None, cached_tokens=0))), metadata={'age': 53, 'name': '埃隆·马斯克', 'occupation': '企业家、工程师、发明家、投资人'})

以上我们就概要介绍了 AgentScope 的 LLM model 模块实现原理,其为各种 provider 的 LLM 提供了统一的抽象接口,即 ChatModelBaseChatResponse 等。

Token 模块

Token 模块负责 Token 计数功能,用于支持消息截断,它与 Formatter 模块配合,确保消息在 Token 限制范围内。Token 模块代码位于 src/agentscope/token 目录下,文件列表如下:

1
2
3
4
5
6
7
8
src/agentscope/token/
├── __init__.py # 模块导出
├── _token_base.py # 抽象基类 TokenCounterBase
├── _openai_token_counter.py # OpenAI Token 计数器
├── _anthropic_token_counter.py # Anthropic Token 计数器
├── _gemini_token_counter.py # Gemini Token 计数器
├── _huggingface_token_counter.py # HuggingFace Token 计数器
└── _char_token_counter.py # 字符计数器(简化实现)

_token_base.py 定义了抽象基类 TokenCounterBase,代码如下:

1
2
3
4
5
6
7
8
9
10
class TokenCounterBase:
"""Token 计数基类"""

@abstractmethod
async def count(
self,
messages: list[dict],
**kwargs: Any,
) -> int:
"""计算消息的 Token 数量"""

TokenCounterBase 核心就是定义了一个抽象方法 count,用于计算消息的 Token 数量。各个具体的模型 Token 计数器则继承自 TokenCounterBase,按照各自的规则实现自己的 Token 计数。

  • CharTokenCounter 实现了简单的基于字符的计数
  • HuggingFaceTokenCounter 基于 transformers.AutoTokenizer 实现 Token 计算
  • AnthropicTokenCounter、GeminiTokenCounter 都是直接调用各自的 API 来计算 Token 数量
  • OpenAITokenCounter 则是通过 tiktoken 实现 Token 计算,计算逻辑相对复杂

Formatter 模块

AgentScope 内部通过 Msg 类型来统一表述 LLM 消息,而 Model 模块则提供了访问 LLM 的统一接口,而为了将 Msg 类型能够转换为具体 LLM 所要求的格式,AgentScope 又实现了 Formatter 消息转换层,主要负责将 AgentScope 内部统一的 Msg 消息对象转换为各个 LLM API 所需的特定格式,这样各个 Model 的 __call__ 接口在处理 messages 等参数时,就可以认为这些参数已经是自己所需要的格式了,不需要再进行额外的格式转换了

1
2
3
4
5
6
7
8
9
10
11
12
# _openai_model.py

class OpenAIChatModel(ChatModelBase):
# messages 等参数已经是 OpenAI API 所需要的格式,不需要再内部转换了
async def __call__(
self,
messages: list[dict],
tools: list[dict] | None = None,
tool_choice: Literal["auto", "none", "required"] | str | None = None,
structured_model: Type[BaseModel] | None = None,
**kwargs: Any,
) -> ChatResponse | AsyncGenerator[ChatResponse, None]:

因此,Formatter 模块是 AgentScope 支持多种 LLM 的关键组件之一,其代码位于 src/agentscope/formatter 目录下,文件列表如下:

1
2
3
4
5
6
7
8
9
10
11
src/agentscope/formatter/
├── __init__.py # 模块导出
├── _formatter_base.py # 抽象基类 FormatterBase
├── _truncated_formatter_base.py # 带 Token 截断的基类
├── _openai_formatter.py # OpenAI 格式化器
├── _anthropic_formatter.py # Anthropic 格式化器
├── _gemini_formatter.py # Gemini 格式化器
├── _dashscope_formatter.py # DashScope 格式化器
├── _ollama_formatter.py # Ollama 格式化器
├── _deepseek_formatter.py # DeepSeek 格式化器
└── _a2a_formatter.py # A2A 格式化器

FormatterBase 抽象基类

FormattterBase 抽象基类定义了所有 Foramtter 的公共接口,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
class FormatterBase:
@abstractmethod
async def format(self, *args, **kwargs) -> list[dict[str, Any]]:
"""将 Msg 对象格式化为 API 所需的字典列表"""

@staticmethod
def assert_list_of_msgs(msgs: list[Msg]) -> None:
"""验证输入是否为 Msg 对象列表"""

@staticmethod
def convert_tool_result_to_string(output) -> tuple[str, list]:
"""将工具结果转换为文本(兼容不支持多模态的 API)"""

TruncatedFormatterBase

TruncatedFormatterBase 则实现了基于 Token 限制进行消息截断的功能,它需要接受一个 TokenCounterBase 对象来计算当前消息的 Token 数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class TruncatedFormatterBase(FormatterBase, ABC):
def __init__(
self,
token_counter: TokenCounterBase | None = None,
max_tokens: int | None = None,
):
self.token_counter = token_counter
self.max_tokens = max_tokens

@trace_format
async def format(self, msgs: list[Msg]) -> list[dict[str, Any]]:
while True:
formatted_msgs = await self._format(msgs)
n_tokens = await self._count(formatted_msgs)

if n_tokens is None or self.max_tokens is None or n_tokens <= self.max_tokens:
return formatted_msgs

# 超过限制,截断消息
msgs = await self._truncate(msgs)

格式化流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
┌─────────────────────────────────────────────────────────────┐
│ format(msgs) │
└─────────────────────────────────────────────────────────────┘


┌───────────────────────────────┐
│ 1. assert_list_of_msgs │
│ 2. deepcopy(msgs) │
└───────────────────────────────┘


┌───────────────────────────────┐
│ 3. _format(msgs) │
│ - _format_system_message │
│ - _group_messages │
│ - _format_tool_sequence │
│ - _format_agent_message │
└───────────────────────────────┘


┌───────────────────────────────┐
│ 4. _count(formatted_msgs) │
└───────────────────────────────┘

┌───────────────┴───────────────┐
▼ ▼
n_tokens <= max_tokens n_tokens > max_tokens
│ │
▼ ▼
return _truncate(msgs) → 循环
  • _group_messages 负责将消息分成两种类型:

    • tool_sequence:工具调用序列,包含 tool_usetool_result Block
    • agent_message:不包含上述 Block,是普通的 Agent 消息
  • 之所以要进行分组,是因为多 Agent 场景下,不同类型的消息需要不同的格式化策略:

    • tool_sequence:保持工具调用的完整性,按 API 要求格式化
    • agent_message:可以合并为历史记录,减少消息数量
  • _format_tool_sequence_format_agent_message 都是抽象方法,需要具体的子类实现,以提供实际的消息格式化逻辑

  • _truncate() 负责对格式化后的消息进行截断,它的实现逻辑:

    • 保留 system 消息
    • 保证 tool_use 和 tool_result 总是成对出现,即要么整体保留,要么整体删除
    • 从最早的消息开始删除

OpenAIChatFormatter & OpenAIMultiAgentFormatter

接下来我们再来看具体的、对消息内容真正进行格式化的子类。这里以适配 OpenAI 模型的 OpenAIChatFormatterOpenAIMultiAgentFormatter 为例。在 AgentScope 中,提供了两种类型的格式化器:ChatFormatterMultiAgentFormatter

  • ChatFormatter:专为标准的用户-助手场景的聊天机器人(单 Agent 对话)设计,可以通过 role 来区分用户和助手
  • MultiAgentFormatter:专为多智能体场景设计,此时可以额外通过 name 来识别不同的实体

先来看 OpenAIChatFormatter 的核心逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class OpenAIChatFormatter(TruncatedFormatterBase):

async def _format(self, msgs: list[Msg]) -> list[dict[str, Any]]:
messages = []
for msg in msgs:
content_blocks = []
tool_calls = []

for block in msg.get_content_blocks():
if block["type"] == "text":
content_blocks.append(block)

elif block["type"] == "tool_use":
tool_calls.append({
"id": block["id"],
"type": "function",
"function": {
"name": block["name"],
"arguments": json.dumps(block["input"]),
},
})
elif block["type"] == "tool_result":
messages.append({
"role": "tool",
"tool_call_id": block["id"],
"content": textual_output,
})
elif block["type"] == "image":
content_blocks.append({
"type": "image_url",
"image_url": {"url": ...},
})
elif block["type"] == "audio":
content_blocks.append({
"type": "input_audio",
"input_audio": {...},
})

messages.append({
"role": msg.role,
"name": msg.name,
"content": content_blocks or None,
"tool_calls": tool_calls or None,
})
  • OpenAIChatFormatter 虽然继承自 TruncatedFormatterBase,但它重新实现了 _format 方法,因此不需要实现 _format_tool_sequence_format_agent_message
  • OpenAIChatFormatter 将 Msg 列表中的每条消息都转换为 OpenAI 所需的格式,因此其对话历史就是用户和助手的消息

再来看 OpenAIMultiAgentFormatter 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class OpenAIMultiAgentFormatter(TruncatedFormatterBase):
# 将 tool_sequence 交给 OpenAIChatFormatter 处理
async def _format_tool_sequence(
self,
msgs: list[Msg],
) -> list[dict[str, Any]]:
"""Given a sequence of tool call/result messages, format them into
the required format for the OpenAI API."""
return await OpenAIChatFormatter(
promote_tool_result_images=self.promote_tool_result_images,
).format(msgs)

async def _format_agent_message(
self,
msgs: list[Msg],
is_first: bool = True,
) -> list[dict[str, Any]]:
# 核心代码抽象
accumulated_text = []
for msg in msgs:
for block in msg.get_content_blocks():
if block["type"] == "text":
accumulated_text.append(f"{msg.name}: {block['text']}")

# 添加历史记录提示词
conversation_blocks_text = conversation_history_prompt + "<history>\n" + "\n".join(accumulated_text) + "\n</history>"

content_list: list[dict[str, Any]] = []
if conversation_blocks_text:
content_list.append(
{
"type": "text",
"text": conversation_blocks_text,
},
)
if images:
content_list.extend(images)
if audios:
content_list.extend(audios)

user_message = {
"role": "user",
"content": content_list,
}

# 添加 content
if content_list:
formatted_msgs.append(user_message)
  • OpenAIMultiAgentFormatter 同样继承自 TruncatedFormatterBase,但它直接复用父类的 _format 方法,因此需要提供 _format_tool_sequence_format_agent_message 方法的实现
  • _format_tool_sequence 直接使用 OpenAIChatFormatter 来格式化工具调用序列
  • _format_agent_message 则是将多 agent 的文本消息合并为一个 <history> 标签包裹的消息

值得说明的是,因为 OpenAI API 本身支持 name 字段,因此 OpenAIFormatter 也可以用于多智能体。而如果使用 OpenAIMultiAgentFormatter,则会将对话历史合并为单个用户消息。

另外,两种 Formatter 都支持 promote_tool_result_images 参数来控制是否将工具调用结果中的图片数据提升为文本消息说明:

1
2
3
4
5
6
7
8
9
10
11
if self.promote_tool_result_images and multimodal_block["type"] == "image":
# 创建新的用户消息,包含图片
msgs.insert(i + 1, Msg(
name="user",
content=[
TextBlock(text="<system-info>The following are images..."),
ImageBlock(source=...),
TextBlock(text="</system-info>"),
],
role="user",
))

一个示例

最后,我们通过两个极简的例子,分别来看下这两种格式化器最后生成的内容是怎样的,以更好理解上述实现逻辑:

  • OpenAIChatFormatter 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import json
from agentscope.message import Msg
from agentscope.formatter import OpenAIChatFormatter


async def main():
formatter = OpenAIChatFormatter()

msgs = [
Msg(name="user", content="你好", role="user"),
Msg(name="assistant", content="你好!有什么可以帮助你的?", role="assistant"),
Msg(name="user", content="介绍一下自己", role="user"),
]

formatted = await formatter.format(msgs)

print("=== OpenAIChatFormatter 输出 ===")
print(json.dumps(formatted, ensure_ascii=False, indent=2))


if __name__ == "__main__":
asyncio.run(main())

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[
{
"role": "user",
"name": "user",
"content": [
{
"type": "text",
"text": "你好"
}
]
},
{
"role": "assistant",
"name": "assistant",
"content": [
{
"type": "text",
"text": "你好!有什么可以帮助你的?"
}
]
},
{
"role": "user",
"name": "user",
"content": [
{
"type": "text",
"text": "介绍一下自己"
}
]
}
]
  • OpenAIMultiAgentFormatter 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import json
from agentscope.message import Msg
from agentscope.formatter import OpenAIMultiAgentFormatter


async def main():
formatter = OpenAIMultiAgentFormatter()

msgs = [
Msg(name="Alice", content="你好,我是 Alice", role="user"),
Msg(name="Bob", content="你好 Alice,我是 Bob", role="assistant"),
Msg(name="Alice", content="很高兴认识你", role="user"),
]

formatted = await formatter.format(msgs)

print("=== OpenAIMultiAgentFormatter 输出 ===")
print(json.dumps(formatted, ensure_ascii=False, indent=2))


if __name__ == "__main__":
asyncio.run(main())

输出结果:

1
2
3
4
5
6
7
8
9
10
11
[
{
"role": "user",
"content": [
{
"type": "text",
"text": "# Conversation History\nThe content between <history></history> tags contains your conversation history\n<history>\nAlice: 你好,我是 Alice\nBob: 你好 Alice,我是 Bob\nAlice: 很高兴认识你\n</history>"
}
]
}
]

小结

本篇文章通过分析 AgentScope 里 MessageModelTokenFormatter 模块的源码实现,深入探讨了该框架兼容多 Provider、多模态大模型(LM)的底层原理。