0%

Kratos 源码分析(七):gRPC Transport(2)

上一篇文章 我们介绍了 kratos gRPC Server 的整体设计、创建流程、启动与停止逻辑等。这篇文章我们将深入 gRPC Transport Server 剩余的实现细节,包括拦截器桥接机制、中间件选择器、编解码器、Transport 上下文等实现。

kratos 中间件体系

在微服务开发中,每个请求在到达业务逻辑之前,往往需要经过一系列通用处理:记录日志、鉴权认证、限流熔断、panic 恢复、请求追踪等等。这些逻辑与具体业务无关,但几乎所有请求都需要。如果每个业务方法都手动写一遍这些逻辑,代码会大量重复且难以维护。中间件就是用来解决这个问题的:它把这些通用逻辑抽象出来,以 层层包裹 的方式套在业务 Handler 外面,业务代码只需关注自己的逻辑即可

kratos 同时支持 HTTP 和 gRPC 两种协议,两种协议各自有自己的拦截器机制,如果每种协议都用各自的拦截器来写这些逻辑,同一功能就要实现两遍。kratos 的做法是:定义一套协议无关的中间件接口,让同一套中间件代码既能用于 HTTP,也能用于 gRPC,再通过各自的桥接层适配到具体协议

kratos 的中间件定义在 middleware/middleware.go 中,非常简洁:

1
2
3
4
5
6
7
// middleware/middleware.go

// Handler defines the handler invoked by Middleware.
type Handler func(ctx context.Context, req any) (any, error)

// Middleware is HTTP/gRPC transport middleware.
type Middleware func(Handler) Handler

有两个核心类型:

  • Handler:一个函数签名 func(ctx context.Context, req any) (any, error),接收请求上下文和请求体,返回响应和错误
  • Middleware:一个函数签名 func(Handler) Handler,接收一个 Handler,返回一个新的 Handler

这是一个经典的洋葱模型设计。每个中间件包装一个 Handler,形成层层嵌套的调用链。例如,一个 recovery 中间件可能这样实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func Recovery() middleware.Middleware {
return func(handler middleware.Handler) middleware.Handler {
return func(ctx context.Context, req any) (any, error) {
// 外层:在调用前添加 recover
defer func() {
if r := recover(); r != nil {
// 处理 panic
}
}()
// 调用下一个 handler
return handler(ctx, req)
}
}
}

这种设计使得中间件与传输协议无关:同一个 middleware.Middleware 可以同时用于 HTTP 和 gRPC。

middleware.Chain:链式组合

多个中间件需要串联执行,middleware.Chain 负责完成这个组合:

1
2
3
4
5
6
7
8
9
// middleware/middleware.go
func Chain(m ...Middleware) Middleware {
return func(next Handler) Handler {
for i := len(m) - 1; i >= 0; i-- {
next = m[i](next)
}
return next
}
}

Chain 从后向前依次包装 handler,确保中间件的执行顺序与注册顺序一致。例如 Chain(A, B, C) 生成的链为 A(B(C(handler))),执行顺序是 A → B → C → handler → C返回 → B返回 → A返回

中间件选择器:matcher.Matcher

kratos 不仅支持全局中间件,还支持按方法名选择性注册中间件。这是通过 matcher.Matcher 实现的:

1
2
3
4
5
6
7
8
9
10
11
12
// internal/matcher/middleware.go
type Matcher interface {
Use(ms ...middleware.Middleware)
Add(selector string, ms ...middleware.Middleware)
Match(operation string) []middleware.Middleware
}

type matcher struct {
prefix []string // 前缀通配符列表
defaults []middleware.Middleware // 默认中间件(Use 注册的)
matches map[string][]middleware.Middleware // 精确或前缀匹配的中间件
}

三个方法的含义:

  • Use(ms...):设置默认中间件,所有请求都会执行
  • Add(selector, ms...):添加选择性中间件,selector 支持精确匹配和前缀通配(* 结尾)
  • Match(operation):根据 operation 匹配并返回中间件列表

Add 方法在添加时,会将 * 结尾的 selector 去掉尾部的 *,存入 prefix 列表:

1
2
3
4
5
6
7
8
9
10
11
func (m *matcher) Add(selector string, ms ...middleware.Middleware) {
if strings.HasSuffix(selector, "*") {
selector = strings.TrimSuffix(selector, "*")
m.prefix = append(m.prefix, selector)
// 按长度从长到短排序,确保更具体的前缀优先匹配
sort.Slice(m.prefix, func(i, j int) bool {
return m.prefix[i] > m.prefix[j]
})
}
m.matches[selector] = ms
}

Match 方法的匹配逻辑是:先加入默认中间件,再尝试精确匹配,最后尝试前缀匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *matcher) Match(operation string) []middleware.Middleware {
ms := make([]middleware.Middleware, 0, len(m.defaults))
// 1. 总是包含默认中间件
if len(m.defaults) > 0 {
ms = append(ms, m.defaults...)
}
// 2. 精确匹配
if next, ok := m.matches[operation]; ok {
return append(ms, next...)
}
// 3. 前缀匹配(按前缀长度从长到短)
for _, prefix := range m.prefix {
if strings.HasPrefix(operation, prefix) {
return append(ms, m.matches[prefix]...)
}
}
return ms
}

grpc.Server 对 Matcher 的使用

kratos 的 grpc.Server 结构体持有两个 matcher.Matcher 实例:

1
2
3
4
5
6
type Server struct {
*grpc.Server
middleware matcher.Matcher // Unary 中间件选择器
streamMiddleware matcher.Matcher // Stream 中间件选择器
// ...
}

创建 Server 时通过 Middleware() 选项注册的中间件,是通过调用 matcher.Use 实现的:

1
2
3
4
5
func Middleware(m ...middleware.Middleware) ServerOption {
return func(s *Server) {
s.middleware.Use(m...)
}
}

Server.Use 的内部则直接调用了 matcher.Add

1
2
3
func (s *Server) Use(selector string, m ...middleware.Middleware) {
s.middleware.Add(selector, m...)
}

Matcher 是 Server Middleware 的底层存储和匹配引擎

Server 层 API 底层调用 对应 Matcher 方法 作用
Middleware(m...) s.middleware.Use(m...) Matcher.Use 注册默认中间件,所有请求都执行
Use(selector, m...) s.middleware.Add(selector, m...) Matcher.Add 注册选择性中间件,按 selector 匹配

举个例子,假设按照如下方式配置了 Server:

调用 说明
srv.Middleware(recovery.Recovery()) 通过选项注册,设置默认中间件
srv.Use("/*", validate.Validator()) 前缀通配,匹配所有 operation
srv.Use("/helloworld.v1.Greeter/*", auth.Check()) 前缀通配,匹配 Greeter 服务的所有方法
srv.Use("/helloworld.v1.Greeter/SayHello", rate.Limit()) 精确匹配,只匹配 SayHello 方法

当请求 /helloworld.v1.Greeter/SayHello 时:

1
2
3
defaults: [recovery]                          ← Use 注册的默认中间件
精确匹配: /helloworld.v1.Greeter/SayHello → [rate] ← 命中精确匹配
最终结果: [recovery, rate]

当请求 /helloworld.v1.Greeter/AnotherMethod 时:

1
2
3
4
defaults: [recovery]                          ← Use 注册的默认中间件
精确匹配: 未命中
前缀匹配: /helloworld.v1.Greeter/ → [auth] ← 命中前缀匹配
最终结果: [recovery, auth]

拦截器桥接

上一篇文章我们提到,gRPC 原生的拦截器类型是 grpc.UnaryServerInterceptor,而 kratos 的中间件类型是 middleware.Middleware,两者之间需要桥接。这正是 interceptor.go 的工作。

gRPC 拦截器简介

gRPC 拦截器就是 gRPC 框架提供的请求拦截机制,类似 HTTP 中间件,在请求到达业务 handler 之前/之后执行通用逻辑。拦截器d的典型用途:日志、认证、限流、超时控制等。

gRPC 拦截器分为两类:

  • Unary 拦截器(普通请求):
1
func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error)
  • Stream 拦截器(流式请求):
1
func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

通过如下方式注册 gRPC 拦截器:

1
2
3
4
5
// 单个
grpc.NewServer(grpc.UnaryInterceptor(myInterceptor))

// 多个链式
grpc.NewServer(grpc.ChainUnaryInterceptor(interceptor1, interceptor2, interceptor3))

执行顺序类似洋葱模型:

1
interceptor1 → interceptor2 → interceptor3 → handler → interceptor3返回 → interceptor2返回 → interceptor1返回

为什么需要桥接

现在我们看到了问题的本质:kratos 的中间件和 gRPC 的拦截器是两套完全不同的签名:

1
2
3
4
5
6
// gRPC UnaryServerInterceptor —— gRPC 框架的拦截器格式
func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error)

// middleware.Middleware —— kratos 的中间件格式
func(Handler) Handler
// 其中 Handler = func(ctx context.Context, req any) (any, error)

gRPC 框架只认自己的 UnaryServerInterceptor,它会在请求到达时调用这个拦截器,传入方法信息和最终业务 handler;而 kratos 的中间件是洋葱模型,只认 Handler → Handler 的包装模式。两者接口不兼容,不能直接互相替代。

因此需要一个桥接层——它本身实现 gRPC 的 UnaryServerInterceptor 接口,在内部将请求转换为 kratos 中间件能理解的 Handler 链来执行

Unary 桥接拦截器

Unary gRPC(一元 gRPC) 是 gRPC 中最简单、最常见的一种通信模式。简单来说,它就像我们平时最熟悉的 传统 HTTP 请求-响应(Request-Response) 模式:客户端发送一个请求,服务器返回一个响应。它的工作流程可以简化为:

  • 客户端发起请求: 客户端调用一个 gRPC 方法,发送单个请求消息给服务器,然后等待。
  • 服务器处理: 服务器接收到这个请求,执行相应的业务逻辑(比如查询数据库、处理数据)。
  • 服务器返回响应: 服务器发送单个响应消息回客户端,并附带状态码(如成功、报错)和可选的元数据。

当一个 Unary RPC 请求到达 kratos gRPC Server 时,执行链如下:

1
gRPC 请求 → grpc.Server → _Greeter_SayHello_Handler → unaryServerInterceptor → kratos middleware chain → 业务 handler

unaryServerInterceptor 是整条链的关键枢纽。在 kratos 的 NewServer() 中,它是通过拦截器链构建并传给 grpc.NewServer 的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// transport/grpc/server.go
func NewServer(opts ...ServerOption) *Server {
srv := &Server{...}
for _, o := range opts {
o(srv)
}

// 1. kratos 桥接拦截器始终在最前面
unaryInts := []grpc.UnaryServerInterceptor{
srv.unaryServerInterceptor(), // ← 这里
}
// 2. 用户自定义拦截器追加在后面
if len(srv.unaryInts) > 0 {
unaryInts = append(unaryInts, srv.unaryInts...)
}

// 3. 通过 grpc.ChainUnaryInterceptor 传入 grpc.NewServer
grpcOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(unaryInts...),
grpc.ChainStreamInterceptor(streamInts...),
}
// ...
srv.Server = grpc.NewServer(grpcOpts...)
}

这样每个 gRPC 请求到达时,都会先经过 kratos 的桥接拦截器,再经过用户自定义的拦截器,最终到达业务 handler

unaryServerInterceptor 实现

让我们看看 kratos 的 unaryServerInterceptor 的实现,它的核心就是返回一个原生的
gRPC UnaryServerInterceptor,并在内部完成 kratos 的 Middleware 逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// transport/grpc/interceptor.go
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
// 1. 合并 Context:将 baseCtx 的值合并到请求 ctx 中
ctx, cancel := ic.Merge(ctx, s.baseCtx)
defer cancel()

// 2. 提取 gRPC metadata 作为请求头
md, _ := grpcmd.FromIncomingContext(ctx)
replyHeader := grpcmd.MD{}

// 3. 创建 Transport 对象,携带本次请求的传输层信息
tr := &Transport{
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
}
if s.endpoint != nil {
tr.endpoint = s.endpoint.String()
}

// 4. 将 Transport 注入到 Context 中
ctx = transport.NewServerContext(ctx, tr)

// 5. 设置超时
if s.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}

// 6. 构建最终的业务 handler
h := func(ctx context.Context, req any) (any, error) {
return handler(ctx, req)
}

// 7. 匹配中间件并构建中间件链
if next := s.middleware.Match(tr.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}

// 8. 执行中间件链
reply, err := h(ctx, req)

// 9. 将响应头写回 gRPC metadata
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return reply, err
}
}

从代码来看,unaryServerInterceptor 作为 gRPC 拦截器被调用时,完成了以下工作:

  1. Context 合并ic.Merge(ctx, s.baseCtx):将请求 Context 和应用级 Context 合并,让请求能访问应用级值
  2. Transport 注入 — 创建 Transport 对象(携带 operation、endpoint、header),通过 transport.NewServerContext 写入 Context
  3. 超时控制context.WithTimeout(ctx, s.timeout):为请求设置默认超时
  4. 中间件匹配s.middleware.Match(operation):根据方法名匹配应该执行的中间件
  5. 中间件链执行middleware.Chain(matched...)(h):将匹配到的中间件串联执行
  6. 响应头写回 — 将中间件通过 tr.ReplyHeader().Set(...) 设置的头写回 gRPC

核心是第 4 和第 5 步:桥接拦截器把 gRPC 传入的 handler 包装成 kratos 的 Handler,然后通过 middleware.Chain 让 kratos 中间件链包裹这个 Handler,从而让 kratos 中间件能在 gRPC 的请求处理链中正常工作。

Context 合并:ic.Merge

ic.Merge 的作用是将两个 Context 合并为一个。为什么要合并?因为 baseCtx 是在 Server 启动时由 App 创建的,携带了应用级别的值(如日志、服务实例信息等),而请求的 ctx 来自 gRPC 框架,只携带了请求级别的值。合并后,业务代码可以从一个 Context 中同时访问两种值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// internal/context/context.go
func Merge(parent1, parent2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{
parent1: parent1, // 请求 ctx
parent2: parent2, // baseCtx
done: make(chan struct{}),
cancelCh: make(chan struct{}),
}
// 如果任一父 Context 已取消,直接完成
select {
case <-parent1.Done():
_ = mc.finish(parent1.Err())
case <-parent2.Done():
_ = mc.finish(parent2.Err())
default:
go mc.wait() // 否则异步等待
}
return mc, mc.cancel
}

mergeCtx 实现了 context.Context 接口,其核心逻辑是:

方法 行为
Done() 返回合并后的 done channel,任一父 Context 取消则关闭
Err() 返回先完成的父 Context 的错误
Deadline() 返回两个父 Context 中更早的截止时间
Value(key) 先查 parent1,不存在则查 parent2,实现了值的合并查找

这意味着 Value 查找是有优先级的:请求 Context 中的值优先,baseCtx 的值作为后备。

Stream 桥接拦截器

相比于 unary gRPC 这种普通一元调用(单次请求 → 单次响应),stream gRPC 允许客户端与服务端进行双向流式通信(多次请求 → 多次响应),具体来说,stream gRPC 有以下几种类型:

  • 服务端流(Server Stream):客户端发1 次请求,服务端持续返回多条数据流。适用于消息推送、日志实时输出、大数据分批返回、进度条实时上报等场景
  • 客户端流(Client Stream):客户端持续发送多条数据流,服务端最后返回1 次响应。适用于大文件分片上传、批量数据入库、批量日志上报等场景
  • 双向流(Bidirectional Stream):双方同时持续互发数据流,互不阻塞。适用于实时聊天、IM 对话等场景

这三种通信方式统一叫做 gRPC Stream / 流式 gRPC。Stream RPC 的拦截器桥接比 Unary 更复杂,因为 Stream 是长连接,涉及多次 SendMsgRecvMsg 调用。

Stream 桥接拦截器的实现位于 streamServerInterceptor 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// transport/grpc/interceptor.go
func (s *Server) streamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 1~3:与 unary 相同的 Context 合并、Transport 注入
ctx, cancel := ic.Merge(ss.Context(), s.baseCtx)
defer cancel()
md, _ := grpcmd.FromIncomingContext(ctx)
replyHeader := grpcmd.MD{}
ctx = transport.NewServerContext(ctx, &Transport{
endpoint: s.endpoint.String(),
operation: info.FullMethod,
reqHeader: headerCarrier(md),
replyHeader: headerCarrier(replyHeader),
})

// 4:构建空的中间件链用于初始化匹配
h := func(_ context.Context, _ any) (any, error) {
return handler(srv, ss), nil
}
if next := s.streamMiddleware.Match(info.FullMethod); len(next) > 0 {
middleware.Chain(next...)(h)
}

// 5:将原始 Stream 替换为 wrappedStream,注入自定义 Context
ctx = context.WithValue(ctx, stream{
ServerStream: ss,
streamMiddleware: s.streamMiddleware,
}, ss)
ws := NewWrappedStream(ctx, ss, s.streamMiddleware)

// 6:使用 wrappedStream 执行 Stream handler
err := handler(srv, ws)
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader)
}
return err
}
}

wrappedStream

对于 Stream RPC,中间件需要在每一次 SendMsgRecvMsg 时执行,而不仅仅是在流开始时。kratos 通过 wrappedStream 实现这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
middleware matcher.Matcher
}

func (w *wrappedStream) Context() context.Context {
return w.ctx // 返回注入了 Transport 的自定义 Context
}

func (w *wrappedStream) SendMsg(m any) error {
h := func(_ context.Context, req any) (any, error) {
return req, w.ServerStream.SendMsg(m)
}
info, ok := transport.FromServerContext(w.ctx)
if !ok {
return fmt.Errorf("transport value stored in ctx returns: %v", ok)
}
if next := w.middleware.Match(info.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}
_, err := h(w.ctx, m)
return err
}

func (w *wrappedStream) RecvMsg(m any) error {
h := func(_ context.Context, req any) (any, error) {
return req, w.ServerStream.RecvMsg(m)
}
info, ok := transport.FromServerContext(w.ctx)
if !ok {
return fmt.Errorf("transport value stored in ctx returns: %v", ok)
}
if next := w.middleware.Match(info.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}
_, err := h(w.ctx, m)
return err
}

wrappedStream 包装了原生的 grpc.ServerStream,重写了三个关键方法:

方法 作用
Context() 返回注入了 Transport 信息的自定义 Context
SendMsg(m) 在每次发送消息时执行中间件链
RecvMsg(m) 在每次接收消息时执行中间件链

这样,Stream RPC 的中间件可以对每一个消息进行拦截处理(如日志、限流等),而不是只在整个流的开始和结束时执行。

Transport 上下文

Transport 结构体

Transport 是 gRPC Transport 层的核心数据载体,每个请求都会创建一个:

1
2
3
4
5
6
7
8
// transport/grpc/transport.go
type Transport struct {
endpoint string // 服务端点,如 "grpc://127.0.0.1:9000"
operation string // 方法全名,如 "/helloworld.v1.Greeter/SayHello"
reqHeader headerCarrier // 请求头(读取 gRPC metadata)
replyHeader headerCarrier // 响应头(写入 gRPC metadata)
nodeFilters []selector.NodeFilter // 客户端节点过滤器
}

Transport 实现了 transport.Transporter 接口的所有方法:

方法 返回值 说明
Kind() transport.KindGRPC 协议类型标识
Endpoint() string 服务端点
Operation() string 方法全名
RequestHeader() transport.Header 请求头
ReplyHeader() transport.Header 响应头(仅服务端有效)
NodeFilters() []selector.NodeFilter 客户端节点过滤器

headerCarrier:gRPC Metadata 适配器

headerCarriermetadata.MD 的类型别名,实现了 transport.Header 接口,将 gRPC 的 metadata 适配为 kratos 统一的 Header 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type headerCarrier metadata.MD

func (mc headerCarrier) Get(key string) string {
vals := metadata.MD(mc).Get(key)
if len(vals) > 0 {
return vals[0]
}
return ""
}

func (mc headerCarrier) Set(key string, value string) {
metadata.MD(mc).Set(key, value)
}

func (mc headerCarrier) Add(key string, value string) {
metadata.MD(mc).Append(key, value)
}

func (mc headerCarrier) Keys() []string {
keys := make([]string, 0, len(mc))
for k := range metadata.MD(mc) {
keys = append(keys, k)
}
return keys
}

func (mc headerCarrier) Values(key string) []string {
return metadata.MD(mc).Get(key)
}

这里有一个关键设计:在 unaryServerInterceptor 中,请求头和响应头使用不同的 metadata.MD 实例:

1
2
3
4
5
6
7
8
9
10
md, _ := grpcmd.FromIncomingContext(ctx)         // 请求头:从 gRPC incoming metadata 读取
replyHeader := grpcmd.MD{} // 响应头:新建空的 metadata.MD
tr := &Transport{
reqHeader: headerCarrier(md), // 只读,读取客户端发送的 metadata
replyHeader: headerCarrier(replyHeader), // 可写,中间件可以设置响应头
}
// ... 执行中间件链后 ...
if len(replyHeader) > 0 {
_ = grpc.SetHeader(ctx, replyHeader) // 将响应头写回 gRPC
}

这样中间件就可以通过 tr.ReplyHeader().Set("key", "value") 设置响应头,这些头最终会被写回 gRPC 响应。

Context 传播

Transport 通过 Context 在中间件和业务逻辑之间传播:

1
2
3
4
5
// 注入(服务端)
ctx = transport.NewServerContext(ctx, tr)

// 提取(业务代码中)
tr, ok := transport.FromServerContext(ctx)

NewServerContextFromServerContext 的实现非常简单,使用了 Go 的 context.WithValue

1
2
3
4
5
6
7
8
9
10
11
// transport/transport.go
type serverTransportKey struct{}

func NewServerContext(ctx context.Context, tr Transporter) context.Context {
return context.WithValue(ctx, serverTransportKey{}, tr)
}

func FromServerContext(ctx context.Context) (tr Transporter, ok bool) {
tr, ok = ctx.Value(serverTransportKey{}).(Transporter)
return
}

业务代码中的典型用法:

1
2
3
4
5
6
7
8
9
10
func (s *GreeterService) SayHello(ctx context.Context, in *v1.HelloRequest) (*v1.HelloReply, error) {
// 获取 Transport 信息
if tr, ok := transport.FromServerContext(ctx); ok {
// 读取请求头
reqID := tr.RequestHeader().Get("x-request-id")
// 设置响应头
tr.ReplyHeader().Set("x-response-id", "xxx")
}
// ...
}

编解码器:JSON Codec

gRPC 默认使用 protobuf 编解码,但 kratos 选择使用 JSON 作为默认编解码格式。这是因为 kratos 同时支持 HTTP 和 gRPC 协议,HTTP 协议通常使用 JSON 格式,为了保持一致性,gRPC 也默认使用 JSON。

所以不要认为 gRPC 只能使用 Protobuf 作为序列化协议(默认是 Protobuf),gRPC 设计目标就是可替换编码:只要实现统一编解码接口,就能换成 JSON、FlatBuffers、自定义二进制等。在 google.golang.org/grpc/encoding 包中,定义了Codec 接口,所有编解码器都要实现它。

Codec 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// transport/grpc/codec.go
func init() {
encoding.RegisterCodec(codec{})
}

type codec struct{}

func (codec) Marshal(v any) ([]byte, error) {
vv, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
}
return enc.GetCodec(json.Name).Marshal(vv)
}

func (codec) Unmarshal(data []byte, v any) error {
vv, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}
return enc.GetCodec(json.Name).Unmarshal(data, vv)
}

func (codec) Name() string {
return json.Name // "json"
}

关键设计点:

  1. 注册时机:通过 init() 函数自动注册,确保在 gRPC Server 启动前可用
  2. 类型断言:编解码前先断言 proto.Message,确保只处理 protobuf 消息
  3. 委托模式:实际的编解码委托给 kratos 的 json codec(enc.GetCodec(json.Name)),而不是直接使用标准库的 encoding/json
  4. 当 gRPC 收到 Content-Type: application/grpc+json 的请求时,会使用这个 JSON codec 来解码请求体。服务端根据靠 json 这个 subtype 来查找 codec 实现

委托给 enc.GetCodec(json.Name) 而非直接使用 encoding/json 的原因是,kratos 的 json codec 支持protobuf 的规范化 JSON 编码(如 StructAny 等 Well-Known Types):

1
2
3
4
5
kratos json codec
└─ enc.GetCodec("json")
└─ github.com/go-kratos/kratos/v2/encoding/json
└─ protojson.MarshalOptions{Multiline: false, EmitUnpopulated: true}
└─ google.golang.org/protobuf/encoding/protojson

这意味着 kratos gRPC 服务可以正确处理 protobuf 的 Well-Known Types(如 google.protobuf.Structgoogle.protobuf.Any 等),而不仅仅是简单的 protobuf 消息。

小结

这篇文章我们详细介绍了 kratos 的 gRPC Server 的中间件体系、桥接拦截器实现、Transport 上下文、编解码器等关键设计。通过这些设计,可以在 kratos 中快速实现实现一个可扩展、高效的 gRPC 服务。