0%

agentscope 源码分析(4):知识库

这篇文章我们继续学习 AgentScope 的源码,主要聚焦于 AgentScope 的知识库实现。知识库就像是 AI Agent 的 离线图书馆,可以为 Agent 提供各种专业/领域知识,让 Agent 能够突破预训练数据的限制,在处理专业任务时 有据可依。因此 AgentScope 提供了内置的 RAG(Retrieval-Augmented Generation)实现,为 Agent 可靠执行复杂任务提供支撑。

AgentScope 的知识库实现主要涉及两个模块:embeddingRAG,下面我们将分别介绍这两个模块的实现原理。

embedding 模块

embeddding 模块负责文本和多模态数据的向量化(Embedding),是 RAG(检索增强生成)和语义搜索的基础组件。简单来说,Embedding(嵌入) 是将现实世界中的碎片信息(如文字、图片、音频)转换成一串高维数字向量的过程,这些信息代表了信息在语义空间的坐标,语义相近的内容,在空间里的距离就近。

embedding 模块的代码位于 src/agentscope/embedding,文件列表如下:

1
2
3
4
5
6
7
8
9
10
11
12
src/agentscope/embedding/
├── __init__.py # 模块导出
├── _embedding_base.py # 抽象基类 EmbeddingModelBase
├── _embedding_response.py # 响应数据类 EmbeddingResponse
├── _embedding_usage.py # 用量统计 EmbeddingUsage
├── _openai_embedding.py # OpenAI 文本嵌入
├── _dashscope_embedding.py # DashScope 文本嵌入
├── _dashscope_multimodal_embedding.py # DashScope 多模态嵌入
├── _gemini_embedding.py # Gemini 文本嵌入
├── _ollama_embedding.py # Ollama 文本嵌入
├── _cache_base.py # 缓存抽象基类
└── _file_cache.py # 文件缓存实现

EmbeddingModelBase

上篇文章介绍 Model 模块实现类似,为了支持不同 Provider 的不同 embedding 模型,AgentScope 定义了 EmbeddingModelBase 抽象基类,为不同 embedding 模型提供统一的抽象。

1
2
3
4
5
6
7
8
9
10
11
12
class EmbeddingModelBase:
model_name: str # 模型名称
supported_modalities: list[str] # 支持的模态:["text"]、["text", "image"]
dimensions: int # 向量维度

def __init__(self, model_name: str, dimensions: int):
self.model_name = model_name
self.dimensions = dimensions

async def __call__(self, *args, **kwargs) -> EmbeddingResponse:
"""调用嵌入 API,子类必须实现"""
raise NotImplementedError()
  • 需要实现 __call__ 方法,使得该类型的对象是可调用对象
  • 通过 supported_modalities 属性,表明该模型支持的模态
  • 每个模型都有其特定的 model_namedimensions(向量维度)

EmbeddingResponse

调用 Embedding 模型后,得到的响应数据使用 EmbeddingResponse 类封装,如下所示:

1
2
3
4
5
6
7
8
9
10
Embedding = List[float]

@dataclass
class EmbeddingResponse(DictMixin):
embeddings: List[Embedding] # 嵌入向量列表
id: str # 响应 ID
created_at: str # 创建时间
type: Literal["embedding"] # 类型标识
usage: EmbeddingUsage | None # 用量统计
source: Literal["cache", "api"] # 数据来源

关于 Embedding 模型的 Token 用量统计,则定义了 EmbeddingUsage 类,如下所示:

1
2
3
4
5
@dataclass
class EmbeddingUsage(DictMixin):
time: float # 耗时(秒)
tokens: int | None # Token 数量
type: Literal["embedding"]

以上就介绍了 Embedding 模块定义的基本数据结构,接下来我们来看具体的 Embedding 子类实现,我们以 OpenAI 的 Embedding 模型调用为例。

OpenAITextEmbedding

AgentScope 针对 OpenAI 模型只提供了文本 Embeeding 的实现,使用 OpenAITextEmbedding 定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class OpenAITextEmbedding(EmbeddingModelBase):
supported_modalities = ["text"]

def __init__(
self,
api_key: str,
model_name: str,
dimensions: int = 1024,
embedding_cache: EmbeddingCacheBase | None = None,
**kwargs: Any,
) -> None:
import openai

super().__init__(model_name, dimensions)

# 创建 OpenAI client
self.client = openai.AsyncClient(api_key=api_key, **kwargs)
self.embedding_cache = embedding_cache

async def __call__(self, text: List[str | TextBlock]) -> EmbeddingResponse:
# 1. 提取文本
gather_text = [item["text"] if isinstance(item, dict) else item for item in text]

# 计算嵌入的参数
kwargs = {
"input": gather_text,
"model": self.model_name,
"dimensions": self.dimensions,
"encoding_format": "float",
**kwargs,
}

# 2. 检查缓存
if self.embedding_cache:
cached = await self.embedding_cache.retrieve(identifier=kwargs)
if cached:
return EmbeddingResponse(embeddings=cached, source="cache")

# 3. 调用 API
response = await self.client.embeddings.create(
input=gather_text,
model=self.model_name,
dimensions=self.dimensions,
encoding_format="float",
)

# 4. 存入缓存
if self.embedding_cache:
await self.embedding_cache.store(identifier=kwargs, embeddings=embeddings)

return EmbeddingResponse(
embeddings=[_.embedding for _ in response.data],
usage=EmbeddingUsage(tokens=response.usage.total_tokens, time=time),
)
  • 仍然是通过 openai.AsyncClient 进行 Embedding 模型调用
  • 在真实调用 Embedding 模型的 API 之前,会先检查缓存中是否能够直接提供 Embedding 结果
  • 只有当缓存未命中时,才会调用 OpenAI 的 Embedding API,并将结果存入缓存
  • 最后返回 EmbeddingResponse 对象,包含 embedding 向量列表、用量统计等信息

这里以 OpenAI 为例,介绍了如何实现 Embedding 能力,embeeding 模块还提供了其他 Provider 的 Embedding 实现,例如 DashScope(支持多模态嵌入)、Gemini 等,这里就不再一一列举。

Embedding 缓存

Embedding 模块还实现了 Embedding 缓存机制,以减少不必要的模型 API 调用。EmbeddingCacheBase 定义了缓存机制的抽象基类,定义了 Embedding 缓存的检索和存储方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EmbeddingCacheBase:
@abstractmethod
async def store(self, embeddings: List[Embedding], identifier: JSONSerializableObject) -> None:
"""存储嵌入向量"""

@abstractmethod
async def retrieve(self, identifier: JSONSerializableObject) -> List[Embedding] | None:
"""检索嵌入向量"""

@abstractmethod
async def remove(self, identifier: JSONSerializableObject) -> None:
"""删除嵌入向量"""

@abstractmethod
async def clear(self) -> None:
"""清空缓存"""

FileEmbeddingCache 则实现了基于文件的缓存实现,它将每个 identifier 对应的 Embeddings 存储为文件系统中的一个单独文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class FileEmbeddingCache(EmbeddingCacheBase):
def __init__(
self,
cache_dir: str = "./.cache/embeddings",
max_file_number: int | None = None,
max_cache_size: int | None = None,
) -> None:
self._cache_dir = os.path.abspath(cache_dir)
# 最大缓存文件数量和缓存大小
self.max_file_number = max_file_number
self.max_cache_size = max_cache_size

# 获取对应的缓存文件
@staticmethod
def _get_filename(identifier: JSONSerializableObject) -> str:
"""Generate a filename based on the identifier."""
json_str = json.dumps(identifier, ensure_ascii=False)
return hashlib.sha256(json_str.encode("utf-8")).hexdigest() + ".npy"

# 存储缓存数据
async def store(
self,
embeddings: List[Embedding],
identifier: JSONSerializableObject,
overwrite: bool = False,
**kwargs: Any,
) -> None:
filename = self._get_filename(identifier)
path_file = os.path.join(self.cache_dir, filename)

np.save(path_file, embeddings)
# 维护缓存目录,删除旧的缓存文件,保持缓存大小和数量限制
await self._maintain_cache_dir()

# 检索缓存
async def retrieve(
self,
identifier: JSONSerializableObject,
) -> List[Embedding] | None:
filename = self._get_filename(identifier)
path_file = os.path.join(self.cache_dir, filename)

return np.load(os.path.join(self.cache_dir, filename)).tolist()
  • 使用 identifier 的 SHA256 哈希作为文件名
  • 使用 numpy 库存储 Embeddings 向量
  • _maintain_cache_dir() 实现了基于 LRU 策略的缓存维护机制,当缓存文件数量或大小超过限制时,会删除最旧的缓存文件

具体的 Embedding 实现可以通过 组合 的方式来使用缓存机制,例如 OpenAITextEmbedding 就将 EmbeddingCacheBase 的对象保存为对象属性:

1
2
3
4
5
6
7
8
9
10
class OpenAITextEmbedding(EmbeddingModelBase):
def __init__(
self,
api_key: str,
model_name: str,
dimensions: int = 1024,
embedding_cache: EmbeddingCacheBase | None = None,
**kwargs: Any,
) -> None:
self.embedding_cache = embedding_cache

以上我们就介绍了 AgentScope 框架的 Embedding 模块的实现,它主要提供了对各种 Embedding 模型调用的封装,同时提供了一个 Embedding 缓存机制,接下来我们继续介绍 RAG 模块的实现原理。

RAG 模块

RAG 模块实现了检索增强生成(Retrieval-Augmented Generation)功能,为 Agent 提供知识检索能力。它包含文档读取、向量存储和知识库管理三个核心子模块。源码位于 src/agentscope/rag 目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
src/agentscope/rag/
├── __init__.py # 模块导出
├── _document.py # 文档数据结构
├── _knowledge_base.py # 知识库抽象基类
├── _simple_knowledge.py # 简单知识库实现
├── _reader/ # 文档读取器
│ ├── __init__.py
│ ├── _reader_base.py # 读取器基类
│ ├── _text_reader.py # 文本读取器
│ ├── _pdf_reader.py # PDF 读取器
│ ├── _word_reader.py # Word 读取器
│ ├── _excel_reader.py # Excel 读取器
│ ├── _ppt_reader.py # PPT 读取器
│ ├── _image_reader.py # 图片读取器
│ └── _utils.py
└── _store/ # 向量数据库存储
├── __init__.py
├── _store_base.py # 存储基类
├── _qdrant_store.py # Qdrant 存储
├── _milvuslite_store.py # Milvus Lite 存储
├── _mongodb_store.py # MongoDB 存储
├── _oceanbase_store.py # OceanBase 存储
└── _alibabacloud_mysql_store.py # 阿里云 MySQL 存储

文档读取

解析读取各类文档并从中提取数据内容,是构建知识库的首要环节。RAG 模块中定义了如下数据结构来表示文档分块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@dataclass
class DocMetadata:
"""文档元数据"""
content: TextBlock | ImageBlock | VideoBlock # 内容块
doc_id: str # 文档 ID
chunk_id: int # 分块 ID
total_chunks: int # 总分块数

@dataclass
class Document:
"""文档分块"""
metadata: DocMetadata # 元数据
id: str # 唯一 ID(自动生成)
embedding: Embedding | None # 嵌入向量(添加/检索时填充)
score: float | None # 相关性分数(检索时填充)
  • Document 表示一个文档分块(chunk),一个传统意义上的文件(例如 pdf 文件、txt 文件等)会包含多个这样的 Document(分档分块)
  • DocMetadata 用来表示文档分块的数据,包括文档 ID、分块 ID 和总分块数,以及真正的内容数据

为了支持从各种格式的文档中读取数据,RAG 模块定义了 ReaderBase 抽象基类,每种具体的文件读取子类都需要继承该抽象基类,通过实现 __call__ 方法,该类型的对象是可调用对象,返回文档块列表

1
2
3
4
5
6
7
8
class ReaderBase:
@abstractmethod
async def __call__(self, *args, **kwargs) -> list[Document]:
"""读取文件,拆分为文档块"""

@abstractmethod
def get_doc_id(self, *args, **kwargs) -> str:
"""生成文档唯一 ID"""

我们以 TextReader 为例,它支持从文本文件(或者文本字符串)中读取数,并将其拆分为多个 Document 文档块 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
lass TextReader(ReaderBase):
def __init__(self, chunk_size: int = 512, split_by: Literal["char", "sentence", "paragraph"] = "sentence"):
self.chunk_size = chunk_size
self.split_by = split_by

async def __call__(self, text: str) -> list[Document]:
# 支持文件路径或文本字符串
if os.path.exists(text):
with open(text, "r", encoding="utf-8") as f:
text = f.read()

# 按指定方式拆分
splits = []
if self.split_by == "char":
for i in range(0, len(text), self.chunk_size):
splits.append(text[i:i + self.chunk_size])
elif self.split_by == "sentence":
import nltk
sentences = nltk.sent_tokenize(text)
splits.extend(sentences)
elif self.split_by == "paragraph":
paragraphs = [p for p in text.split("\n") if p]
splits.extend(paragraphs)

# 创建 Document 列表
doc_id = self.get_doc_id(text)
return [
Document(
id=doc_id,
metadata=DocMetadata(
content=TextBlock(type="text", text=split),
doc_id=doc_id,
chunk_id=idx,
total_chunks=len(splits),
),
)
for idx, split in enumerate(splits)
]

def get_doc_id(self, text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
  • 支持多种分割方式,包括按字符、句子或段落分割
  • 支持定义每个分割块的最大长度
  • 将分割后的文本块,转换为 Document 对象

除了支持 TextReader 之外,RAG 模块还支持 WordReaderPowerPointReaderExcelReaderImageReaderPDFReader 等,以实现对不同格式文档的读取。

向量存储

读取文档内容并将其向量化之后,还需要将这些向量保存到向量数据库中,这样我们就能直接通过向量检索的方式来获取与指定 query 相关的文档片段。RAG 模块支持多种向量存储数据库,其定义了 VDBStoreBase 抽象基类来表示向量存储应该实现的公共接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class VDBStoreBase:
@abstractmethod
async def add(self, documents: list[Document]) -> None:
"""添加文档到向量数据库"""

@abstractmethod
async def delete(self, *args, **kwargs) -> None:
"""删除文档"""

@abstractmethod
async def search(self, query_embedding: Embedding, limit: int, score_threshold: float | None = None) -> list[Document]:
"""向量搜索"""

def get_client(self) -> Any:
"""获取底层客户端"""
raise NotImplementedError()

RAG 模块目前内置了以下向量存储的实现:

Store 特点 适用场景
QdrantStore 支持内存/本地/远程模式 开发测试、生产部署
MilvusLiteStore Milvus 轻量版 本地开发
MongoDBStore MongoDB Atlas Vector Search 已有 MongoDB 环境
OceanBaseStore OceanBase 向量扩展 企业级部署
AlibabaCloudMySQLStore 阿里云 MySQL 向量搜索 阿里云环境

我们以 QdrantStore 这个实现为例,其核心就是通过 QdrantClient 来操作 Qdrant 向量数据库,实现文档向量的增删查操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class QdrantStore(VDBStoreBase):
def __init__(self, location: str, collection_name: str, dimensions: int, distance: str = "Cosine"):
from qdrant_client import AsyncQdrantClient
self._client = AsyncQdrantClient(location=location)
self.collection_name = collection_name
self.dimensions = dimensions
self.distance = distance

async def _validate_collection(self) -> None:
"""确保集合存在"""
if not await self._client.collection_exists(self.collection_name):
await self._client.create_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=self.dimensions,
distance=getattr(models.Distance, self.distance.upper()),
),
)

async def add(self, documents: list[Document]) -> None:
await self._validate_collection()

await self._client.upsert(
collection_name=self.collection_name,
points=[
PointStruct(
id=_map_text_to_uuid(json.dumps({
"doc_id": _.metadata.doc_id,
"chunk_id": _.metadata.chunk_id,
"content": _.metadata.content,
})),
vector=_.embedding,
payload=_.metadata, # 元数据存储在 payload 中
)
for _ in documents
],
)

async def search(self, query_embedding: Embedding, limit: int, score_threshold: float | None = None) -> list[Document]:
res = await self._client.query_points(
collection_name=self.collection_name,
query=query_embedding,
limit=limit,
score_threshold=score_threshold,
)

return [
Document(
embedding=point.vector,
score=point.score,
metadata=DocMetadata(**point.payload),
)
for point in res.points
]

知识库实现

有了 embedding 模型、文档读取、向量存储这些基本组件后,知识库的构建就相当简单了。RAG 模块使用 KnowledgeBase 类来表示知识库,定义一个知识库应该提供哪些 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class KnowledgeBase:
embedding_store: VDBStoreBase # 向量存储
embedding_model: EmbeddingModelBase # 嵌入模型

def __init__(self, embedding_store, embedding_model):
self.embedding_store = embedding_store
self.embedding_model = embedding_model

@abstractmethod
async def retrieve(self, query: str, limit: int = 5, score_threshold: float | None = None) -> list[Document]:
"""检索相关文档"""

@abstractmethod
async def add_documents(self, documents: list[Document]) -> None:
"""添加文档"""

async def retrieve_knowledge(self, query: str, limit: int = 5, score_threshold: float | None = None) -> ToolResponse:
"""便捷方法:返回 ToolResponse,可直接作为工具结果"""
  • 其包含 embedding_storeembedding_model 两个属性,分别表示向量存储和所使用嵌入模型

SimpleKnowledgeBaseKnowledgeBase 的一个具体实现,实现了简单的基于向量检索的知识库实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SimpleKnowledge(KnowledgeBase):
async def retrieve(self, query: str, limit: int = 5, score_threshold: float | None = None) -> list[Document]:
# 1. 将查询文本转换为向量
res_embedding = await self.embedding_model([TextBlock(type="text", text=query)])

# 2. 向量搜索
res = await self.embedding_store.search(
res_embedding.embeddings[0],
limit=limit,
score_threshold=score_threshold,
)
return res

async def add_documents(self, documents: list[Document]) -> None:
# 1. 验证模态支持
for doc in documents:
if doc.metadata.content["type"] not in self.embedding_model.supported_modalities:
raise ValueError(...)

# 2. 计算嵌入向量
res_embeddings = await self.embedding_model([_.metadata.content for _ in documents])

# 3. 设置嵌入向量
for doc, embedding in zip(documents, res_embeddings.embeddings):
doc.embedding = embedding

# 4. 存入向量数据库
await self.embedding_store.add(documents)

知识库小结

通过 embedding 模块和 rag 模块,AgentScope 提供了最基本的知识库特性,整体使用流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
┌─────────────────────────────────────────────────────────────────┐
│ 1. 文档读取与分块 │
│ Reader(PDF/Word/Text) → list[Document] │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 2. 向量化与存储 │
│ KnowledgeBase.add_documents() │
│ └── EmbeddingModel([content]) → embeddings │
│ └── VDBStore.add(documents with embeddings) │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 3. 检索 │
│ KnowledgeBase.retrieve(query) │
│ └── EmbeddingModel([query]) → query_embedding │
│ └── VDBStore.search(query_embedding) → list[Document] │
└─────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ 4. 返回结果 │
│ list[Document] with scores │
└─────────────────────────────────────────────────────────────────┘

小结

这篇文章我们学习了 AgentScope 知识库实现原理,包括 embedding 模块和 rag 模块的代码实现。这里我们只是介绍了知识库内部是如何实现的,在 Agent 中到底应该怎么使用知识库呢?这个我们将在后续介绍 Agent 实现时再详细说明。

Reference