这篇文章我们继续学习 AgentScope 的源码,主要聚焦于 AgentScope 的知识库实现。知识库就像是 AI Agent 的 离线图书馆,可以为 Agent 提供各种专业/领域知识,让 Agent 能够突破预训练数据的限制,在处理专业任务时 有据可依。因此 AgentScope 提供了内置的 RAG(Retrieval-Augmented Generation)实现,为 Agent 可靠执行复杂任务提供支撑。
AgentScope 的知识库实现主要涉及两个模块:embedding 和 RAG,下面我们将分别介绍这两个模块的实现原理。
embedding 模块
embeddding 模块负责文本和多模态数据的向量化(Embedding),是 RAG(检索增强生成)和语义搜索的基础组件。简单来说,Embedding(嵌入) 是将现实世界中的碎片信息(如文字、图片、音频)转换成一串高维数字向量的过程,这些信息代表了信息在语义空间的坐标,语义相近的内容,在空间里的距离就近。
embedding 模块的代码位于 src/agentscope/embedding,文件列表如下:
1 2 3 4 5 6 7 8 9 10 11 12
| src/agentscope/embedding/ ├── __init__.py ├── _embedding_base.py ├── _embedding_response.py ├── _embedding_usage.py ├── _openai_embedding.py ├── _dashscope_embedding.py ├── _dashscope_multimodal_embedding.py ├── _gemini_embedding.py ├── _ollama_embedding.py ├── _cache_base.py └── _file_cache.py
|
EmbeddingModelBase
和上篇文章介绍 Model 模块实现类似,为了支持不同 Provider 的不同 embedding 模型,AgentScope 定义了 EmbeddingModelBase 抽象基类,为不同 embedding 模型提供统一的抽象。
1 2 3 4 5 6 7 8 9 10 11 12
| class EmbeddingModelBase: model_name: str supported_modalities: list[str] dimensions: int
def __init__(self, model_name: str, dimensions: int): self.model_name = model_name self.dimensions = dimensions
async def __call__(self, *args, **kwargs) -> EmbeddingResponse: """调用嵌入 API,子类必须实现""" raise NotImplementedError()
|
- 需要实现
__call__ 方法,使得该类型的对象是可调用对象
- 通过
supported_modalities 属性,表明该模型支持的模态
- 每个模型都有其特定的
model_name 和 dimensions(向量维度)
EmbeddingResponse
调用 Embedding 模型后,得到的响应数据使用 EmbeddingResponse 类封装,如下所示:
1 2 3 4 5 6 7 8 9 10
| Embedding = List[float]
@dataclass class EmbeddingResponse(DictMixin): embeddings: List[Embedding] id: str created_at: str type: Literal["embedding"] usage: EmbeddingUsage | None source: Literal["cache", "api"]
|
关于 Embedding 模型的 Token 用量统计,则定义了 EmbeddingUsage 类,如下所示:
1 2 3 4 5
| @dataclass class EmbeddingUsage(DictMixin): time: float tokens: int | None type: Literal["embedding"]
|
以上就介绍了 Embedding 模块定义的基本数据结构,接下来我们来看具体的 Embedding 子类实现,我们以 OpenAI 的 Embedding 模型调用为例。
OpenAITextEmbedding
AgentScope 针对 OpenAI 模型只提供了文本 Embeeding 的实现,使用 OpenAITextEmbedding 定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| class OpenAITextEmbedding(EmbeddingModelBase): supported_modalities = ["text"]
def __init__( self, api_key: str, model_name: str, dimensions: int = 1024, embedding_cache: EmbeddingCacheBase | None = None, **kwargs: Any, ) -> None: import openai
super().__init__(model_name, dimensions)
self.client = openai.AsyncClient(api_key=api_key, **kwargs) self.embedding_cache = embedding_cache
async def __call__(self, text: List[str | TextBlock]) -> EmbeddingResponse: gather_text = [item["text"] if isinstance(item, dict) else item for item in text]
kwargs = { "input": gather_text, "model": self.model_name, "dimensions": self.dimensions, "encoding_format": "float", **kwargs, }
if self.embedding_cache: cached = await self.embedding_cache.retrieve(identifier=kwargs) if cached: return EmbeddingResponse(embeddings=cached, source="cache")
response = await self.client.embeddings.create( input=gather_text, model=self.model_name, dimensions=self.dimensions, encoding_format="float", )
if self.embedding_cache: await self.embedding_cache.store(identifier=kwargs, embeddings=embeddings)
return EmbeddingResponse( embeddings=[_.embedding for _ in response.data], usage=EmbeddingUsage(tokens=response.usage.total_tokens, time=time), )
|
- 仍然是通过
openai.AsyncClient 进行 Embedding 模型调用
- 在真实调用 Embedding 模型的 API 之前,会先检查缓存中是否能够直接提供 Embedding 结果
- 只有当缓存未命中时,才会调用 OpenAI 的 Embedding API,并将结果存入缓存
- 最后返回
EmbeddingResponse 对象,包含 embedding 向量列表、用量统计等信息
这里以 OpenAI 为例,介绍了如何实现 Embedding 能力,embeeding 模块还提供了其他 Provider 的 Embedding 实现,例如 DashScope(支持多模态嵌入)、Gemini 等,这里就不再一一列举。
Embedding 缓存
Embedding 模块还实现了 Embedding 缓存机制,以减少不必要的模型 API 调用。EmbeddingCacheBase 定义了缓存机制的抽象基类,定义了 Embedding 缓存的检索和存储方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class EmbeddingCacheBase: @abstractmethod async def store(self, embeddings: List[Embedding], identifier: JSONSerializableObject) -> None: """存储嵌入向量"""
@abstractmethod async def retrieve(self, identifier: JSONSerializableObject) -> List[Embedding] | None: """检索嵌入向量"""
@abstractmethod async def remove(self, identifier: JSONSerializableObject) -> None: """删除嵌入向量"""
@abstractmethod async def clear(self) -> None: """清空缓存"""
|
FileEmbeddingCache 则实现了基于文件的缓存实现,它将每个 identifier 对应的 Embeddings 存储为文件系统中的一个单独文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| class FileEmbeddingCache(EmbeddingCacheBase): def __init__( self, cache_dir: str = "./.cache/embeddings", max_file_number: int | None = None, max_cache_size: int | None = None, ) -> None: self._cache_dir = os.path.abspath(cache_dir) self.max_file_number = max_file_number self.max_cache_size = max_cache_size
@staticmethod def _get_filename(identifier: JSONSerializableObject) -> str: """Generate a filename based on the identifier.""" json_str = json.dumps(identifier, ensure_ascii=False) return hashlib.sha256(json_str.encode("utf-8")).hexdigest() + ".npy"
async def store( self, embeddings: List[Embedding], identifier: JSONSerializableObject, overwrite: bool = False, **kwargs: Any, ) -> None: filename = self._get_filename(identifier) path_file = os.path.join(self.cache_dir, filename)
np.save(path_file, embeddings) await self._maintain_cache_dir()
async def retrieve( self, identifier: JSONSerializableObject, ) -> List[Embedding] | None: filename = self._get_filename(identifier) path_file = os.path.join(self.cache_dir, filename)
return np.load(os.path.join(self.cache_dir, filename)).tolist()
|
- 使用
identifier 的 SHA256 哈希作为文件名
- 使用 numpy 库存储 Embeddings 向量
_maintain_cache_dir() 实现了基于 LRU 策略的缓存维护机制,当缓存文件数量或大小超过限制时,会删除最旧的缓存文件
具体的 Embedding 实现可以通过 组合 的方式来使用缓存机制,例如 OpenAITextEmbedding 就将 EmbeddingCacheBase 的对象保存为对象属性:
1 2 3 4 5 6 7 8 9 10
| class OpenAITextEmbedding(EmbeddingModelBase): def __init__( self, api_key: str, model_name: str, dimensions: int = 1024, embedding_cache: EmbeddingCacheBase | None = None, **kwargs: Any, ) -> None: self.embedding_cache = embedding_cache
|
以上我们就介绍了 AgentScope 框架的 Embedding 模块的实现,它主要提供了对各种 Embedding 模型调用的封装,同时提供了一个 Embedding 缓存机制,接下来我们继续介绍 RAG 模块的实现原理。
RAG 模块
RAG 模块实现了检索增强生成(Retrieval-Augmented Generation)功能,为 Agent 提供知识检索能力。它包含文档读取、向量存储和知识库管理三个核心子模块。源码位于 src/agentscope/rag 目录下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| src/agentscope/rag/ ├── __init__.py ├── _document.py ├── _knowledge_base.py ├── _simple_knowledge.py ├── _reader/ │ ├── __init__.py │ ├── _reader_base.py │ ├── _text_reader.py │ ├── _pdf_reader.py │ ├── _word_reader.py │ ├── _excel_reader.py │ ├── _ppt_reader.py │ ├── _image_reader.py │ └── _utils.py └── _store/ ├── __init__.py ├── _store_base.py ├── _qdrant_store.py ├── _milvuslite_store.py ├── _mongodb_store.py ├── _oceanbase_store.py └── _alibabacloud_mysql_store.py
|
文档读取
解析读取各类文档并从中提取数据内容,是构建知识库的首要环节。RAG 模块中定义了如下数据结构来表示文档分块:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @dataclass class DocMetadata: """文档元数据""" content: TextBlock | ImageBlock | VideoBlock doc_id: str chunk_id: int total_chunks: int
@dataclass class Document: """文档分块""" metadata: DocMetadata id: str embedding: Embedding | None score: float | None
|
Document 表示一个文档分块(chunk),一个传统意义上的文件(例如 pdf 文件、txt 文件等)会包含多个这样的 Document(分档分块)
DocMetadata 用来表示文档分块的数据,包括文档 ID、分块 ID 和总分块数,以及真正的内容数据
为了支持从各种格式的文档中读取数据,RAG 模块定义了 ReaderBase 抽象基类,每种具体的文件读取子类都需要继承该抽象基类,通过实现 __call__ 方法,该类型的对象是可调用对象,返回文档块列表
1 2 3 4 5 6 7 8
| class ReaderBase: @abstractmethod async def __call__(self, *args, **kwargs) -> list[Document]: """读取文件,拆分为文档块"""
@abstractmethod def get_doc_id(self, *args, **kwargs) -> str: """生成文档唯一 ID"""
|
我们以 TextReader 为例,它支持从文本文件(或者文本字符串)中读取数,并将其拆分为多个 Document 文档块 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| lass TextReader(ReaderBase): def __init__(self, chunk_size: int = 512, split_by: Literal["char", "sentence", "paragraph"] = "sentence"): self.chunk_size = chunk_size self.split_by = split_by
async def __call__(self, text: str) -> list[Document]: if os.path.exists(text): with open(text, "r", encoding="utf-8") as f: text = f.read()
splits = [] if self.split_by == "char": for i in range(0, len(text), self.chunk_size): splits.append(text[i:i + self.chunk_size]) elif self.split_by == "sentence": import nltk sentences = nltk.sent_tokenize(text) splits.extend(sentences) elif self.split_by == "paragraph": paragraphs = [p for p in text.split("\n") if p] splits.extend(paragraphs)
doc_id = self.get_doc_id(text) return [ Document( id=doc_id, metadata=DocMetadata( content=TextBlock(type="text", text=split), doc_id=doc_id, chunk_id=idx, total_chunks=len(splits), ), ) for idx, split in enumerate(splits) ]
def get_doc_id(self, text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
- 支持多种分割方式,包括按字符、句子或段落分割
- 支持定义每个分割块的最大长度
- 将分割后的文本块,转换为 Document 对象
除了支持 TextReader 之外,RAG 模块还支持 WordReader、PowerPointReader、ExcelReader、ImageReader、PDFReader 等,以实现对不同格式文档的读取。
向量存储
读取文档内容并将其向量化之后,还需要将这些向量保存到向量数据库中,这样我们就能直接通过向量检索的方式来获取与指定 query 相关的文档片段。RAG 模块支持多种向量存储数据库,其定义了 VDBStoreBase 抽象基类来表示向量存储应该实现的公共接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class VDBStoreBase: @abstractmethod async def add(self, documents: list[Document]) -> None: """添加文档到向量数据库"""
@abstractmethod async def delete(self, *args, **kwargs) -> None: """删除文档"""
@abstractmethod async def search(self, query_embedding: Embedding, limit: int, score_threshold: float | None = None) -> list[Document]: """向量搜索"""
def get_client(self) -> Any: """获取底层客户端""" raise NotImplementedError()
|
RAG 模块目前内置了以下向量存储的实现:
| Store |
特点 |
适用场景 |
QdrantStore |
支持内存/本地/远程模式 |
开发测试、生产部署 |
MilvusLiteStore |
Milvus 轻量版 |
本地开发 |
MongoDBStore |
MongoDB Atlas Vector Search |
已有 MongoDB 环境 |
OceanBaseStore |
OceanBase 向量扩展 |
企业级部署 |
AlibabaCloudMySQLStore |
阿里云 MySQL 向量搜索 |
阿里云环境 |
我们以 QdrantStore 这个实现为例,其核心就是通过 QdrantClient 来操作 Qdrant 向量数据库,实现文档向量的增删查操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| class QdrantStore(VDBStoreBase): def __init__(self, location: str, collection_name: str, dimensions: int, distance: str = "Cosine"): from qdrant_client import AsyncQdrantClient self._client = AsyncQdrantClient(location=location) self.collection_name = collection_name self.dimensions = dimensions self.distance = distance
async def _validate_collection(self) -> None: """确保集合存在""" if not await self._client.collection_exists(self.collection_name): await self._client.create_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=self.dimensions, distance=getattr(models.Distance, self.distance.upper()), ), )
async def add(self, documents: list[Document]) -> None: await self._validate_collection()
await self._client.upsert( collection_name=self.collection_name, points=[ PointStruct( id=_map_text_to_uuid(json.dumps({ "doc_id": _.metadata.doc_id, "chunk_id": _.metadata.chunk_id, "content": _.metadata.content, })), vector=_.embedding, payload=_.metadata, ) for _ in documents ], )
async def search(self, query_embedding: Embedding, limit: int, score_threshold: float | None = None) -> list[Document]: res = await self._client.query_points( collection_name=self.collection_name, query=query_embedding, limit=limit, score_threshold=score_threshold, )
return [ Document( embedding=point.vector, score=point.score, metadata=DocMetadata(**point.payload), ) for point in res.points ]
|
知识库实现
有了 embedding 模型、文档读取、向量存储这些基本组件后,知识库的构建就相当简单了。RAG 模块使用 KnowledgeBase 类来表示知识库,定义一个知识库应该提供哪些 API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class KnowledgeBase: embedding_store: VDBStoreBase embedding_model: EmbeddingModelBase
def __init__(self, embedding_store, embedding_model): self.embedding_store = embedding_store self.embedding_model = embedding_model
@abstractmethod async def retrieve(self, query: str, limit: int = 5, score_threshold: float | None = None) -> list[Document]: """检索相关文档"""
@abstractmethod async def add_documents(self, documents: list[Document]) -> None: """添加文档"""
async def retrieve_knowledge(self, query: str, limit: int = 5, score_threshold: float | None = None) -> ToolResponse: """便捷方法:返回 ToolResponse,可直接作为工具结果"""
|
- 其包含
embedding_store 和 embedding_model 两个属性,分别表示向量存储和所使用嵌入模型
SimpleKnowledgeBase 是 KnowledgeBase 的一个具体实现,实现了简单的基于向量检索的知识库实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class SimpleKnowledge(KnowledgeBase): async def retrieve(self, query: str, limit: int = 5, score_threshold: float | None = None) -> list[Document]: res_embedding = await self.embedding_model([TextBlock(type="text", text=query)])
res = await self.embedding_store.search( res_embedding.embeddings[0], limit=limit, score_threshold=score_threshold, ) return res
async def add_documents(self, documents: list[Document]) -> None: for doc in documents: if doc.metadata.content["type"] not in self.embedding_model.supported_modalities: raise ValueError(...)
res_embeddings = await self.embedding_model([_.metadata.content for _ in documents])
for doc, embedding in zip(documents, res_embeddings.embeddings): doc.embedding = embedding
await self.embedding_store.add(documents)
|
知识库小结
通过 embedding 模块和 rag 模块,AgentScope 提供了最基本的知识库特性,整体使用流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ┌─────────────────────────────────────────────────────────────────┐ │ 1. 文档读取与分块 │ │ Reader(PDF/Word/Text) → list[Document] │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 2. 向量化与存储 │ │ KnowledgeBase.add_documents() │ │ └── EmbeddingModel([content]) → embeddings │ │ └── VDBStore.add(documents with embeddings) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 3. 检索 │ │ KnowledgeBase.retrieve(query) │ │ └── EmbeddingModel([query]) → query_embedding │ │ └── VDBStore.search(query_embedding) → list[Document] │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 4. 返回结果 │ │ list[Document] with scores │ └─────────────────────────────────────────────────────────────────┘
|
小结
这篇文章我们学习了 AgentScope 知识库实现原理,包括 embedding 模块和 rag 模块的代码实现。这里我们只是介绍了知识库内部是如何实现的,在 Agent 中到底应该怎么使用知识库呢?这个我们将在后续介绍 Agent 实现时再详细说明。
Reference