上一篇文章 我们分析了 kratos 的 App 模块,了解了应用的生命周期管理机制。App 管理的核心对象之一就是 Transport Server,这篇文章我们将深入分析 gRPC Transport 层的实现原理,理解一个 gRPC 请求从进入到业务处理的完整链路。
Transport 层总体设计
kratos 的 Transport 层是对底层通信协议的抽象,目前支持 HTTP 和 gRPC 两种协议。整个 Transport 层的设计围绕以下几个核心接口展开:
transport.Server 接口
所有 Transport 服务器都必须实现 transport.Server 接口,App 通过它统一管理生命周期:
1 2 3 4 5 type Server interface { Start(context.Context) error Stop(context.Context) error }
transport.Endpointer 接口
transport.Endpointer 接口用于获取服务的 Endpoint 地址,App 在构建服务实例时通过此接口自动收集 Server 的 Endpoint 地址
1 2 3 4 type Endpointer interface { Endpoint() (*url.URL, error ) }
transport.Transporter 接口
Transporter 是 Transport 层的核心上下文接口,它携带了一次请求的传输层信息 ,通过 Context 在中间件和业务逻辑之间传播:
1 2 3 4 5 6 7 type Transporter interface { Kind() Kind Endpoint() string Operation() string RequestHeader() Header ReplyHeader() Header }
middleware.Middleware 接口
kratos 的中间件是 Transport 无关的,HTTP 和 gRPC 共用同一套中间件定义:
1 2 3 4 type Handler func (ctx context.Context, req any) (any, error )type Middleware func (Handler) Handler
接口关系图
下图简单总结了各个接口之间的关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 transport.Server 生命周期接口(App 管理) ├─ http.Server HTTP 服务器实现 └─ grpc.Server gRPC 服务器实现 ├─ 实现 transport.Server → Start / Stop ├─ 实现 transport.Endpointer → Endpoint ├─ 内嵌 *grpc.Server → 原生 gRPC 能力 └─ 持有 matcher.Matcher → 中间件选择器 transport.Transporter 请求上下文接口(每次请求创建) ├─ grpc.Transport gRPC 请求的传输信息 └─ http.Transport HTTP 请求的传输信息 middleware.Middleware 中间件定义(Transport 无关) └─ 通过 interceptor 桥接到 gRPC / HTTP
原生 gRPC Server 开发
在分析 kratos gRPC Transport 之前,我们需要先理解:如果不使用 kratos,用原生 gRPC 库开发一个服务需要做哪些事情?这能帮我们看清哪些是 protoc 自动生成的、哪些是框架完成的、哪些是业务需要补全的。
定义 Proto 文件
一切从 Proto 文件开始,这部分无论用不用 kratos 都是一样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 syntax = "proto3" ; package helloworld.v1;option go_package = "native-grpc-demo/api/helloworld/v1;v1" ;service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1 ; } message HelloReply { string message = 1 ; }
protoc 生成代码
执行 protoc 命令后,会生成两个文件:
1 2 3 protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ api/helloworld/v1/greeter.proto
文件
生成工具
内容
greeter.pb.go
protoc-gen-go
消息类型 HelloRequest、HelloReply 的序列化/反序列化代码
greeter_grpc.pb.go
protoc-gen-go-grpc
服务接口 GreeterServer、客户端 GreeterClient、注册函数 RegisterGreeterServer、处理器 _Greeter_SayHello_Handler
greeter_grpc.pb.go 是重点 ,它包含以下关键内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type GreeterServer interface { SayHello(context.Context, *HelloRequest) (*HelloReply, error ) mustEmbedUnimplementedGreeterServer() } type UnimplementedGreeterServer struct {}func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error ) { return nil , status.Error(codes.Unimplemented, "method SayHello not implemented" ) } func RegisterGreeterServer (s grpc.ServiceRegistrar, srv GreeterServer) { s.RegisterService(&Greeter_ServiceDesc, srv) } func _Greeter_SayHello_Handler (srv interface {}, ctx context.Context, dec func (interface {}) error , interceptor grpc.UnaryServerInterceptor) (interface {}, error ) { in := new (HelloRequest) if err := dec(in); err != nil { return nil , err } if interceptor == nil { return srv.(GreeterServer).SayHello(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/helloworld.v1.Greeter/SayHello" , } handler := func (ctx context.Context, req interface {}) (interface {}, error ) { return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) } return interceptor(ctx, in, info, handler) } var Greeter_ServiceDesc = grpc.ServiceDesc{ ServiceName: "helloworld.v1.Greeter" , HandlerType: (*GreeterServer)(nil ), Methods: []grpc.MethodDesc{ {MethodName: "SayHello" , Handler: _Greeter_SayHello_Handler}, }, Streams: []grpc.StreamDesc{}, }
业务开发
接下来开发整个 grpc 服务的实现,包括实现 GreeterServer 接口以提供 grpc 服务接口的实现,创建真实的 gRPC Server 并启动等等。如下是一个参考实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package mainimport ( "context" "fmt" "log" "net" "os" "os/signal" "syscall" pb "native-grpc-demo/api/helloworld/v1" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" ) type GreeterService struct { pb.UnimplementedGreeterServer } func (s *GreeterService) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error ) { return &pb.HelloReply{Message: "Hello " + in.Name}, nil } func main () { lis, err := net.Listen("tcp" , ":9000" ) if err != nil { log.Fatalf("failed to listen: %v" , err) } server := grpc.NewServer() pb.RegisterGreeterServer(server, &GreeterService{}) healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(server, healthServer) healthServer.Resume() reflection.Register(server) fmt.Println("gRPC server listening on :9000" ) go func () { if err := server.Serve(lis); err != nil { log.Fatalf("failed to serve: %v" , err) } }() sigCh := make (chan os.Signal, 1 ) signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT) <-sigCh server.GracefulStop() fmt.Println("Server stopped" ) }
总结一下,开发一个 gRPC Server 需要完成以下核心工作:
定义 Proto 文件 :编写 .proto 文件,定义服务接口和消息类型
生成代码 :执行 protoc 生成消息类型的序列化代码和服务接口代码
实现服务接口 :嵌入 UnimplementedXxxServer,实现业务方法(如 SayHello)
创建 gRPC Server :grpc.NewServer() 创建服务器实例
注册服务 :调用 RegisterXxxServer 将业务实现注册到 Server
注册辅助服务 :健康检查、反射服务等
监听与启动 :创建 TCP Listener,调用 server.Serve(lis) 启动服务
信号处理与优雅停机 :监听系统信号,调用 server.GracefulStop() 优雅停机
如果需要添加中间件功能,需要手动实现拦截器并注册到 gRPC 服务中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func timeoutInterceptor (timeout time.Duration) grpc.UnaryServerInterceptor { return func (ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error ) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return handler(ctx, req) } } grpcServer := grpc.NewServer( grpc.UnaryInterceptor(timeoutInterceptor(10 *time.Second)), )
kratos-layout 中的 gRPC 使用
之前文章我们已经详细分析过 kratos-layout 以及 [Kratos-CLI] 的用法。这一小节我们来重点分析下 kratos-layout 项目中是如何使用 gRPC Server 的。
Proto 定义
在 kratos-layout 项目中,服务接口通过 Protobuf 定义,位于 api/helloworld/v1/greeter.proto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 syntax = "proto3"; package helloworld.v1; import "google/api/annotations.proto"; option go_package = "kratos-demo/api/helloworld/v1;v1"; service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) { option (google.api.http) = { get: "/helloworld/{name}" }; } } message HelloRequest { string name = 1; } message HelloReply { string message = 1; }
生成的 gRPC 注册代码
执行 kratos proto client 后,会生成以下文件:
文件
生成工具
说明
greeter.pb.go
protoc-gen-go
Protobuf 消息类型(HelloRequest、HelloReply)
greeter_grpc.pb.go
protoc-gen-go-grpc
gRPC 服务接口(GreeterServer、RegisterGreeterServer)
greeter_http.pb.go
protoc-gen-go-http
HTTP 服务接口和路由注册(GreeterHTTPServer、RegisterGreeterHTTPServer)
greeter_grpc.pb.go 是由 protoc-gen-go-grpc 自动生成的,其内容和上面 原生 gRPC Server 开发 示例中的代码一致。
业务代码需要补全的部分:
文件
说明
internal/service/greeter.go
实现 GreeterServer 接口,编写业务逻辑
internal/server/grpc.go
创建 gRPC Server,注册服务和中间件
internal/server/http.go
创建 HTTP Server,注册服务和中间件
业务实现
Service 层 (internal/service/greeter.go)——实现 GreeterServer 接口:
1 2 3 4 5 6 7 8 9 10 11 12 type GreeterService struct { v1.UnimplementedGreeterServer uc *biz.GreeterUsecase } func (s *GreeterService) SayHello(ctx context.Context, in *v1.HelloRequest) (*v1.HelloReply, error ) { g, err := s.uc.CreateGreeter(ctx, &biz.Greeter{Hello: in.Name}) if err != nil { return nil , err } return &v1.HelloReply{Message: "Hello " + g.Hello}, nil }
Server 层 (internal/server/grpc.go)——创建 gRPC Server 并注册服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func NewGRPCServer (c *conf.Server, greeter *service.GreeterService, logger log.Logger) *grpc.Server { var opts = []grpc.ServerOption{ grpc.Middleware(recovery.Recovery()), } if c.Grpc.Addr != "" { opts = append (opts, grpc.Address(c.Grpc.Addr)) } if c.Grpc.Timeout != nil { opts = append (opts, grpc.Timeout(c.Grpc.Timeout.AsDuration())) } srv := grpc.NewServer(opts...) v1.RegisterGreeterServer(srv, greeter) return srv }
请求处理流程图
下面总结下 gRPC 请求的处理流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 gRPC 请求到达 │ ▼ 原生 gRPC Server 接收请求 │ ▼ Greeter_ServiceDesc 定位到 _Greeter_SayHello_Handler │ ▼ _Greeter_SayHello_Handler 调用 kratos 注册的 UnaryInterceptor │ ▼ unaryServerInterceptor(kratos 的桥接拦截器) ├─ 1. 合并 Context(ic.Merge) ├─ 2. 创建 Transport(携带 operation、header 等信息) ├─ 3. 设置超时 ├─ 4. 匹配中间件并执行 middleware.Chain │ └─ recovery.Recovery → ... → 最终 handler └─ 5. 返回响应 │ ▼ GreeterService.SayHello(业务逻辑)
这里需要注意,在 greeter_grpc.pb.go 中的实现的 _Greeter_SayHello_Handler 中, interceptor 是 gRPC 原生的 UnaryServerInterceptor,而 kratos 的中间件是 middleware.Middleware,两者之间需要桥接,这就是 kratos interceptor.go 的工作,后面会介绍。
代码分层总览
通过上面的分析,我们可以清晰地看到一个 gRPC 服务从定义到运行,每一层各完成了哪些工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ┌─────────────────────────────────────────────────────────────┐ │ 开发者手写 │ │ ├── greeter.proto │ │ │ 定义消息类型(HelloRequest、HelloReply)和 RPC 方法 │ │ │ │ │ ├── internal/service/greeter.go │ │ │ 实现 GreeterServer 接口,编写业务逻辑(SayHello) │ │ │ │ │ └── internal/server/grpc.go │ │ 创建 kratos grpc.Server、配置选项、注册服务和中间件 │ ├─────────────────────────────────────────────────────────────┤ │ protoc 自动生成 │ │ ├── greeter.pb.go │ │ │ 消息类型的序列化/反序列化(HelloRequest、HelloReply) │ │ │ │ │ ├── greeter_grpc.pb.go │ │ │ GreeterServer 接口(方法签名) │ │ │ UnimplementedGreeterServer(前向兼容) │ │ │ RegisterGreeterServer(注册函数) │ │ │ _Greeter_SayHello_Handler(方法处理器) │ │ │ Greeter_ServiceDesc(服务描述符) │ │ │ │ │ └── greeter_http.pb.go │ │ GreeterHTTPServer 接口和路由注册 │ ├─────────────────────────────────────────────────────────────┤ │ grpc-go 库提供 │ │ ├── grpc.Server TCP 监听、连接管理、请求分发 │ │ ├── grpc.UnaryServerInterceptor 拦截器机制 │ │ ├── grpc.ServiceRegistrar 服务注册抽象 │ │ ├── grpc.ServiceDesc 服务描述符(由protoc生成后传入) │ │ ├── 编解码(protobuf/json) 消息的序列化与反序列化 │ │ └── 健康检查、反射等标准服务 │ ├─────────────────────────────────────────────────────────────┤ │ kratos 框架完成 │ │ ├── grpc.NewServer() 创建 Server 并注册内部服务 │ │ ├── server.Start() 启动服务(监听 + 健康检查) │ │ ├── server.Stop() 优雅停机(含超时强制停止) │ │ ├── unaryServerInterceptor 拦截器桥接(gRPC → kratos) │ │ ├── middleware.Middleware 统一的中间件定义与执行链 │ │ ├── matcher.Matcher 选择性中间件匹配 │ │ ├── ic.Merge Context 合并 │ │ ├── Transport 注入 注入传输信息到 Context │ │ ├── 超时控制 自动设置请求超时 │ │ ├── 健康检查 / 反射 / Admin 自动注册内部服务 │ │ ├── JSON Codec 默认使用 JSON 编解码 │ │ └── App 生命周期管理 信号处理、服务注册/注销 │ └─────────────────────────────────────────────────────────────┘
gRPC Server 实现
Server 结构体
如下是 kratos 中 gRPC Server 的结构体定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type Server struct { *grpc.Server baseCtx context.Context tlsConf *tls.Config lis net.Listener err error network string address string endpoint *url.URL timeout time.Duration middleware matcher.Matcher streamMiddleware matcher.Matcher unaryInts []grpc.UnaryServerInterceptor streamInts []grpc.StreamServerInterceptor grpcOpts []grpc.ServerOption health *health.Server customHealth bool metadata *apimd.Server adminClean func () disableReflection bool }
Server 嵌入了 *grpc.Server,因此它既是 kratos 的 transport.Server,也是标准的 gRPC 服务器。所有原生 gRPC 功能(服务注册、反射、健康检查等)都直接可用。
创建 Server
NewServer() 负责 gRPC Server 的创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 func NewServer (opts ...ServerOption) *Server { srv := &Server{ baseCtx: context.Background(), network: "tcp" , address: ":0" , timeout: 1 * time.Second, health: health.NewServer(), middleware: matcher.New(), streamMiddleware: matcher.New(), } for _, o := range opts { o(srv) } unaryInts := []grpc.UnaryServerInterceptor{ srv.unaryServerInterceptor(), } streamInts := []grpc.StreamServerInterceptor{ srv.streamServerInterceptor(), } if len (srv.unaryInts) > 0 { unaryInts = append (unaryInts, srv.unaryInts...) } if len (srv.streamInts) > 0 { streamInts = append (streamInts, srv.streamInts...) } grpcOpts := []grpc.ServerOption{ grpc.ChainUnaryInterceptor(unaryInts...), grpc.ChainStreamInterceptor(streamInts...), } if srv.tlsConf != nil { grpcOpts = append (grpcOpts, grpc.Creds(credentials.NewTLS(srv.tlsConf))) } if len (srv.grpcOpts) > 0 { grpcOpts = append (grpcOpts, srv.grpcOpts...) } srv.Server = grpc.NewServer(grpcOpts...) srv.metadata = apimd.NewServer(srv.Server) if !srv.customHealth { grpc_health_v1.RegisterHealthServer(srv.Server, srv.health) } apimd.RegisterMetadataServer(srv.Server, srv.metadata) if !srv.disableReflection { reflection.Register(srv.Server) } srv.adminClean, _ = admin.Register(srv.Server) return srv }
关键设计:
拦截器顺序 :kratos 的桥接拦截器始终在链的最前面,确保所有请求都经过 kratos 的中间件处理
内部服务注册 :默认注册了健康检查(grpc_health_v1)、元数据服务和反射服务
中间件选择器 :使用 matcher.New() 创建,支持按 operation 精确匹配和前缀通配
gRPC Server 默认注册的内部服务有:
服务
说明
grpc_health_v1
gRPC 标准健康检查协议
apimd.Metadata
kratos 元数据服务
reflection
gRPC 反射服务(可用 grpcurl 探测)
admin
gRPC admin 服务
Server 选项
``NewServer()` 同样通过选项模式来配置 Server,其支持如下配置选项:
选项函数
说明
默认值
Network(network)
网络类型
"tcp"
Address(addr)
监听地址
":0"
Endpoint(url)
服务端点
自动从 Listener 获取
Timeout(d)
请求超时
1s
Middleware(m...)
注册中间件
空
StreamMiddleware(m...)
Stream 中间件
空
TLSConfig(c)
TLS 配置
nil
Listener(lis)
自定义 Listener
自动创建
UnaryInterceptor(in...)
gRPC Unary 拦截器
空
StreamInterceptor(in...)
gRPC Stream 拦截器
空
CustomHealth()
自定义健康检查
false
DisableReflection()
禁用反射
false
Options(opts...)
原生 gRPC 选项
空
启动与停止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (s *Server) Start(ctx context.Context) error { if err := s.listenAndEndpoint(); err != nil { return s.err } s.baseCtx = ctx log.Infof("[gRPC] server listening on: %s" , s.lis.Addr().String()) s.health.Resume() return s.Serve(s.lis) } func (s *Server) Stop(ctx context.Context) error { if s.adminClean != nil { s.adminClean() } s.health.Shutdown() done := make (chan struct {}) go func () { defer close (done) log.Info("[gRPC] server stopping" ) s.GracefulStop() }() select { case <-done: case <-ctx.Done(): log.Warn("[gRPC] server couldn't stop gracefully in time, doing force stop" ) s.Server.Stop() } return nil }
Stop() 方法先调用 GracefulStop() 优雅停止(等待正在处理的请求完成),但如果 Context 超时(由 App 的 stopTimeout 控制),则调用 Stop 强制停止。这保证了应用不会因为个别长请求而无限期阻塞。
Endpoint 构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (s *Server) listenAndEndpoint() error { if s.lis == nil { lis, err := net.Listen(s.network, s.address) if err != nil { s.err = err return err } s.lis = lis } if s.endpoint == nil { addr, err := host.Extract(s.address, s.lis) if err != nil { s.err = err return err } s.endpoint = endpoint.NewEndpoint(endpoint.Scheme("grpc" , s.tlsConf != nil ), addr) } return s.err }
Endpoint 的构建分两步:
创建 Listener :如果未提供自定义 Listener,则根据 network 和 address 创建
生成 Endpoint URL :格式如 grpc://192.168.1.100:9000,地址通过 host.Extract 自动提取(处理 0.0.0.0 等特殊情况,能够自动选择真实的的本机 IP 地址、端口返回)
小结
这篇文章我们介绍了 kratos gRPC Server 的关键设计,首先分析了开发一个原生 gRPC Server 的步骤,之后以 kratos-layout 为例,展示了 kratos 框架下开发 gRPC 服务的基本流程以及各个组件的职责,最后介绍了 kratos gRPC Server 结构定义、启动与停止逻辑等细节。
下一篇文章我们将继续深入 kratos gRPC Server 的实现细节,包括拦截器/中间件、编解码等实现原理。