上一篇文章 我们介绍了 kratos gRPC Server 的整体设计、创建流程、启动与停止逻辑等。这篇文章我们将深入 gRPC Transport Server 剩余的实现细节,包括拦截器桥接机制、中间件选择器、编解码器、Transport 上下文等实现。
kratos 中间件体系
在微服务开发中,每个请求在到达业务逻辑之前,往往需要经过一系列通用处理:记录日志、鉴权认证、限流熔断、panic 恢复、请求追踪等等。这些逻辑与具体业务无关,但几乎所有请求都需要。如果每个业务方法都手动写一遍这些逻辑,代码会大量重复且难以维护。中间件就是用来解决这个问题的:它把这些通用逻辑抽象出来,以 层层包裹 的方式套在业务 Handler 外面,业务代码只需关注自己的逻辑即可。
kratos 同时支持 HTTP 和 gRPC 两种协议,两种协议各自有自己的拦截器机制,如果每种协议都用各自的拦截器来写这些逻辑,同一功能就要实现两遍。kratos 的做法是:定义一套协议无关的中间件接口,让同一套中间件代码既能用于 HTTP,也能用于 gRPC,再通过各自的桥接层适配到具体协议。
kratos 的中间件定义在 middleware/middleware.go 中,非常简洁:
1 | // middleware/middleware.go |
有两个核心类型:
Handler:一个函数签名func(ctx context.Context, req any) (any, error),接收请求上下文和请求体,返回响应和错误Middleware:一个函数签名func(Handler) Handler,接收一个 Handler,返回一个新的 Handler
这是一个经典的洋葱模型设计。每个中间件包装一个 Handler,形成层层嵌套的调用链。例如,一个 recovery 中间件可能这样实现:
1 | func Recovery() middleware.Middleware { |
这种设计使得中间件与传输协议无关:同一个 middleware.Middleware 可以同时用于 HTTP 和 gRPC。
middleware.Chain:链式组合
多个中间件需要串联执行,middleware.Chain 负责完成这个组合:
1 | // middleware/middleware.go |
Chain 从后向前依次包装 handler,确保中间件的执行顺序与注册顺序一致。例如 Chain(A, B, C) 生成的链为 A(B(C(handler))),执行顺序是 A → B → C → handler → C返回 → B返回 → A返回。
中间件选择器:matcher.Matcher
kratos 不仅支持全局中间件,还支持按方法名选择性注册中间件。这是通过 matcher.Matcher 实现的:
1 | // internal/matcher/middleware.go |
三个方法的含义:
Use(ms...):设置默认中间件,所有请求都会执行Add(selector, ms...):添加选择性中间件,selector 支持精确匹配和前缀通配(*结尾)Match(operation):根据 operation 匹配并返回中间件列表
Add 方法在添加时,会将 * 结尾的 selector 去掉尾部的 *,存入 prefix 列表:
1 | func (m *matcher) Add(selector string, ms ...middleware.Middleware) { |
Match 方法的匹配逻辑是:先加入默认中间件,再尝试精确匹配,最后尝试前缀匹配:
1 | func (m *matcher) Match(operation string) []middleware.Middleware { |
grpc.Server 对 Matcher 的使用
kratos 的 grpc.Server 结构体持有两个 matcher.Matcher 实例:
1 | type Server struct { |
创建 Server 时通过 Middleware() 选项注册的中间件,是通过调用 matcher.Use 实现的:
1 | func Middleware(m ...middleware.Middleware) ServerOption { |
Server.Use 的内部则直接调用了 matcher.Add
1 | func (s *Server) Use(selector string, m ...middleware.Middleware) { |
即 Matcher 是 Server Middleware 的底层存储和匹配引擎:
| Server 层 API | 底层调用 | 对应 Matcher 方法 | 作用 |
|---|---|---|---|
Middleware(m...) |
s.middleware.Use(m...) |
Matcher.Use |
注册默认中间件,所有请求都执行 |
Use(selector, m...) |
s.middleware.Add(selector, m...) |
Matcher.Add |
注册选择性中间件,按 selector 匹配 |
举个例子,假设按照如下方式配置了 Server:
| 调用 | 说明 |
|---|---|
srv.Middleware(recovery.Recovery()) |
通过选项注册,设置默认中间件 |
srv.Use("/*", validate.Validator()) |
前缀通配,匹配所有 operation |
srv.Use("/helloworld.v1.Greeter/*", auth.Check()) |
前缀通配,匹配 Greeter 服务的所有方法 |
srv.Use("/helloworld.v1.Greeter/SayHello", rate.Limit()) |
精确匹配,只匹配 SayHello 方法 |
当请求 /helloworld.v1.Greeter/SayHello 时:
1 | defaults: [recovery] ← Use 注册的默认中间件 |
当请求 /helloworld.v1.Greeter/AnotherMethod 时:
1 | defaults: [recovery] ← Use 注册的默认中间件 |
拦截器桥接
上一篇文章我们提到,gRPC 原生的拦截器类型是 grpc.UnaryServerInterceptor,而 kratos 的中间件类型是 middleware.Middleware,两者之间需要桥接。这正是 interceptor.go 的工作。
gRPC 拦截器简介
gRPC 拦截器就是 gRPC 框架提供的请求拦截机制,类似 HTTP 中间件,在请求到达业务 handler 之前/之后执行通用逻辑。拦截器d的典型用途:日志、认证、限流、超时控制等。
gRPC 拦截器分为两类:
- Unary 拦截器(普通请求):
1 | func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) |
- Stream 拦截器(流式请求):
1 | func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error |
通过如下方式注册 gRPC 拦截器:
1 | // 单个 |
执行顺序类似洋葱模型:
1 | interceptor1 → interceptor2 → interceptor3 → handler → interceptor3返回 → interceptor2返回 → interceptor1返回 |
为什么需要桥接
现在我们看到了问题的本质:kratos 的中间件和 gRPC 的拦截器是两套完全不同的签名:
1 | // gRPC UnaryServerInterceptor —— gRPC 框架的拦截器格式 |
gRPC 框架只认自己的 UnaryServerInterceptor,它会在请求到达时调用这个拦截器,传入方法信息和最终业务 handler;而 kratos 的中间件是洋葱模型,只认 Handler → Handler 的包装模式。两者接口不兼容,不能直接互相替代。
因此需要一个桥接层——它本身实现 gRPC 的 UnaryServerInterceptor 接口,在内部将请求转换为 kratos 中间件能理解的 Handler 链来执行。
Unary 桥接拦截器
Unary gRPC(一元 gRPC) 是 gRPC 中最简单、最常见的一种通信模式。简单来说,它就像我们平时最熟悉的 传统 HTTP 请求-响应(Request-Response) 模式:客户端发送一个请求,服务器返回一个响应。它的工作流程可以简化为:
- 客户端发起请求: 客户端调用一个 gRPC 方法,发送单个请求消息给服务器,然后等待。
- 服务器处理: 服务器接收到这个请求,执行相应的业务逻辑(比如查询数据库、处理数据)。
- 服务器返回响应: 服务器发送单个响应消息回客户端,并附带状态码(如成功、报错)和可选的元数据。
当一个 Unary RPC 请求到达 kratos gRPC Server 时,执行链如下:
1 | gRPC 请求 → grpc.Server → _Greeter_SayHello_Handler → unaryServerInterceptor → kratos middleware chain → 业务 handler |
unaryServerInterceptor 是整条链的关键枢纽。在 kratos 的 NewServer() 中,它是通过拦截器链构建并传给 grpc.NewServer 的:
1 | // transport/grpc/server.go |
这样每个 gRPC 请求到达时,都会先经过 kratos 的桥接拦截器,再经过用户自定义的拦截器,最终到达业务 handler。
unaryServerInterceptor 实现
让我们看看 kratos 的 unaryServerInterceptor 的实现,它的核心就是返回一个原生的
gRPC UnaryServerInterceptor,并在内部完成 kratos 的 Middleware 逻辑:
1 | // transport/grpc/interceptor.go |
从代码来看,unaryServerInterceptor 作为 gRPC 拦截器被调用时,完成了以下工作:
- Context 合并 —
ic.Merge(ctx, s.baseCtx):将请求 Context 和应用级 Context 合并,让请求能访问应用级值 - Transport 注入 — 创建
Transport对象(携带 operation、endpoint、header),通过transport.NewServerContext写入 Context - 超时控制 —
context.WithTimeout(ctx, s.timeout):为请求设置默认超时 - 中间件匹配 —
s.middleware.Match(operation):根据方法名匹配应该执行的中间件 - 中间件链执行 —
middleware.Chain(matched...)(h):将匹配到的中间件串联执行 - 响应头写回 — 将中间件通过
tr.ReplyHeader().Set(...)设置的头写回 gRPC
核心是第 4 和第 5 步:桥接拦截器把 gRPC 传入的 handler 包装成 kratos 的 Handler,然后通过 middleware.Chain 让 kratos 中间件链包裹这个 Handler,从而让 kratos 中间件能在 gRPC 的请求处理链中正常工作。
Context 合并:ic.Merge
ic.Merge 的作用是将两个 Context 合并为一个。为什么要合并?因为 baseCtx 是在 Server 启动时由 App 创建的,携带了应用级别的值(如日志、服务实例信息等),而请求的 ctx 来自 gRPC 框架,只携带了请求级别的值。合并后,业务代码可以从一个 Context 中同时访问两种值。
1 | // internal/context/context.go |
mergeCtx 实现了 context.Context 接口,其核心逻辑是:
| 方法 | 行为 |
|---|---|
Done() |
返回合并后的 done channel,任一父 Context 取消则关闭 |
Err() |
返回先完成的父 Context 的错误 |
Deadline() |
返回两个父 Context 中更早的截止时间 |
Value(key) |
先查 parent1,不存在则查 parent2,实现了值的合并查找 |
这意味着 Value 查找是有优先级的:请求 Context 中的值优先,baseCtx 的值作为后备。
Stream 桥接拦截器
相比于 unary gRPC 这种普通一元调用(单次请求 → 单次响应),stream gRPC 允许客户端与服务端进行双向流式通信(多次请求 → 多次响应),具体来说,stream gRPC 有以下几种类型:
- 服务端流(Server Stream):客户端发1 次请求,服务端持续返回多条数据流。适用于消息推送、日志实时输出、大数据分批返回、进度条实时上报等场景
- 客户端流(Client Stream):客户端持续发送多条数据流,服务端最后返回1 次响应。适用于大文件分片上传、批量数据入库、批量日志上报等场景
- 双向流(Bidirectional Stream):双方同时持续互发数据流,互不阻塞。适用于实时聊天、IM 对话等场景
这三种通信方式统一叫做 gRPC Stream / 流式 gRPC。Stream RPC 的拦截器桥接比 Unary 更复杂,因为 Stream 是长连接,涉及多次 SendMsg 和 RecvMsg 调用。
Stream 桥接拦截器的实现位于 streamServerInterceptor 方法中:
1 | // transport/grpc/interceptor.go |
wrappedStream
对于 Stream RPC,中间件需要在每一次 SendMsg 和 RecvMsg 时执行,而不仅仅是在流开始时。kratos 通过 wrappedStream 实现这一点:
1 | type wrappedStream struct { |
wrappedStream 包装了原生的 grpc.ServerStream,重写了三个关键方法:
| 方法 | 作用 |
|---|---|
Context() |
返回注入了 Transport 信息的自定义 Context |
SendMsg(m) |
在每次发送消息时执行中间件链 |
RecvMsg(m) |
在每次接收消息时执行中间件链 |
这样,Stream RPC 的中间件可以对每一个消息进行拦截处理(如日志、限流等),而不是只在整个流的开始和结束时执行。
Transport 上下文
Transport 结构体
Transport 是 gRPC Transport 层的核心数据载体,每个请求都会创建一个:
1 | // transport/grpc/transport.go |
Transport 实现了 transport.Transporter 接口的所有方法:
| 方法 | 返回值 | 说明 |
|---|---|---|
Kind() |
transport.KindGRPC |
协议类型标识 |
Endpoint() |
string |
服务端点 |
Operation() |
string |
方法全名 |
RequestHeader() |
transport.Header |
请求头 |
ReplyHeader() |
transport.Header |
响应头(仅服务端有效) |
NodeFilters() |
[]selector.NodeFilter |
客户端节点过滤器 |
headerCarrier:gRPC Metadata 适配器
headerCarrier 是 metadata.MD 的类型别名,实现了 transport.Header 接口,将 gRPC 的 metadata 适配为 kratos 统一的 Header 接口:
1 | type headerCarrier metadata.MD |
这里有一个关键设计:在 unaryServerInterceptor 中,请求头和响应头使用不同的 metadata.MD 实例:
1 | md, _ := grpcmd.FromIncomingContext(ctx) // 请求头:从 gRPC incoming metadata 读取 |
这样中间件就可以通过 tr.ReplyHeader().Set("key", "value") 设置响应头,这些头最终会被写回 gRPC 响应。
Context 传播
Transport 通过 Context 在中间件和业务逻辑之间传播:
1 | // 注入(服务端) |
NewServerContext 和 FromServerContext 的实现非常简单,使用了 Go 的 context.WithValue:
1 | // transport/transport.go |
业务代码中的典型用法:
1 | func (s *GreeterService) SayHello(ctx context.Context, in *v1.HelloRequest) (*v1.HelloReply, error) { |
编解码器:JSON Codec
gRPC 默认使用 protobuf 编解码,但 kratos 选择使用 JSON 作为默认编解码格式。这是因为 kratos 同时支持 HTTP 和 gRPC 协议,HTTP 协议通常使用 JSON 格式,为了保持一致性,gRPC 也默认使用 JSON。
所以不要认为 gRPC 只能使用 Protobuf 作为序列化协议(默认是 Protobuf),gRPC 设计目标就是可替换编码:只要实现统一编解码接口,就能换成 JSON、FlatBuffers、自定义二进制等。在 google.golang.org/grpc/encoding 包中,定义了Codec 接口,所有编解码器都要实现它。
Codec 实现
1 | // transport/grpc/codec.go |
关键设计点:
- 注册时机:通过
init()函数自动注册,确保在 gRPC Server 启动前可用 - 类型断言:编解码前先断言
proto.Message,确保只处理 protobuf 消息 - 委托模式:实际的编解码委托给 kratos 的
jsoncodec(enc.GetCodec(json.Name)),而不是直接使用标准库的encoding/json - 当 gRPC 收到
Content-Type: application/grpc+json的请求时,会使用这个 JSON codec 来解码请求体。服务端根据靠 json 这个 subtype 来查找 codec 实现
委托给 enc.GetCodec(json.Name) 而非直接使用 encoding/json 的原因是,kratos 的 json codec 支持protobuf 的规范化 JSON 编码(如 Struct、Any 等 Well-Known Types):
1 | kratos json codec |
这意味着 kratos gRPC 服务可以正确处理 protobuf 的 Well-Known Types(如 google.protobuf.Struct、google.protobuf.Any 等),而不仅仅是简单的 protobuf 消息。
小结
这篇文章我们详细介绍了 kratos 的 gRPC Server 的中间件体系、桥接拦截器实现、Transport 上下文、编解码器等关键设计。通过这些设计,可以在 kratos 中快速实现实现一个可扩展、高效的 gRPC 服务。