上一篇文章 我们深入 kratos gRPC Server 的中间件体系、拦截器桥接、Transport 上下文等关键实现,这篇文章我们将目光转向客户端:介绍 kratos gRPC Client 的创建流程、客户端拦截器桥接、服务发现与负载均衡、以及整个 RPC 调用的完整链路。
为什么需要 kratos gRPC Client?
一个自然的疑问:gRPC 本身已经提供了完整的客户端(grpc.Dial → *grpc.ClientConn),kratos 为什么还要自己封装一层?直接用原生 gRPC 客户端不行吗?答案是:原生 gRPC 客户端只是一个连接层,缺乏微服务框架所需的周边能力。具体来说,在微服务场景下,一个完整的客户端调用需要解决以下问题:
原生 gRPC 客户端的不足
| 需求 |
原生 gRPC |
问题 |
| 中间件(日志/追踪/鉴权) |
只提供 UnaryClientInterceptor |
与 HTTP 中间件不互通,需要为 gRPC 和 HTTP 各写一套 |
| 服务发现 |
需要自己实现 resolver.Builder |
每种注册中心(Consul/Etcd/Nacos)都要写适配代码 |
| 负载均衡 |
内置 round_robin、pick_first |
不支持按权重动态调整,没有节点过滤能力 |
| 超时控制 |
需要在每次调用时传 context.WithTimeout |
没有统一的默认超时,容易遗漏 |
| 请求元数据 |
通过 metadata.AppendToOutgoingContext |
需要手动管理 key-value 的拷贝,容易出错 |
| 调用结果反馈 |
无 |
无法根据 RPC 结果动态调整节点权重 |
这些不是 gRPC 的缺陷,因为 gRPC 的设计目标就是做一个高效的 RPC 框架,不负责服务治理。但这些确实是一个微服务框架应该帮开发者解决的问题。
kratos Client 提供的价值
kratos Client 不是重新实现一个 gRPC 客户端,而是通过 gRPC 的三个扩展点(Interceptor、Resolver、Balancer)将 kratos 框架能力注入,让开发者用标准 *grpc.ClientConn 就能获得:
1 2 3 4 5 6
| 用原生 gRPC 客户端: grpc.Dial("127.0.0.1:9000") → 手动管理 metadata → 手动超时 → 手动拦截器
用 kratos Client: grpc.Dial("discovery:///service-name") → 自动服务发现 → 自动负载均衡 → 统一中间件(HTTP/gRPC 复用) → 自动超时 → 自动 Header 转换
|
核心价值体现在四个层面:
- 统一中间件体系:同一套
middleware.Middleware 代码在 HTTP Client 和 gRPC Client 中复用,不需要为两种协议各写一遍日志、追踪、鉴权逻辑
- 开箱即用的服务发现:只需注入
registry.Discovery 实现,kratos 自动完成地址解析、实例监听、子集过滤
- 智能负载均衡:WRR 加权轮询,支持 NodeFilter 节点过滤,根据 RPC 调用结果动态调整权重
- 统一的超时与元数据管理:默认 2s 超时,Header 到 gRPC metadata 自动转换,Transport 上下文贯穿整个调用链
下面我们详细看这些能力是如何实现的。
整体架构
kratos gRPC Client 的核心职责是:将 kratos 的中间件体系、服务发现、负载均衡能力无缝集成到 gRPC 原生客户端中。整个客户端架构分为三层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ┌─────────────────────────────────────────────────────────────┐ │ 用户代码层 │ │ grpc.Dial("discovery:///service-name") → *grpc.ClientConn │ │ conn.Invoke(ctx, method, req, reply) │ ├─────────────────────────────────────────────────────────────┤ │ kratos 桥接层 │ │ ┌──────────────────┐ ┌──────────────┐ ┌───────────────┐ │ │ │ ClientInterceptor│ │ Resolver │ │ Balancer │ │ │ │ (中间件链桥接) │ │ (服务发现) │ │ (负载均衡) │ │ │ └──────────────────┘ └──────────────┘ └───────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ gRPC 原生层 │ │ grpc.ClientConn / grpc.UnaryInvoker / grpc.SubConn │ └─────────────────────────────────────────────────────────────┘
|
- 用户代码层:开发者通过
Dial/DialInsecure 获得标准 *grpc.ClientConn,后续调用完全符合 gRPC 习惯
- kratos 桥接层:通过拦截器、Resolver、Balancer 三个 gRPC 扩展点,将 kratos 的中间件、服务发现、负载均衡注入到 gRPC 框架中
- gRPC 原生层:gRPC 框架本身的连接管理、多路复用、流控等能力
Client 创建流程
入口函数:Dial 和 DialInsecure
kratos 提供了两个公开的 dial 函数(transport/grpc/client.go:154-161),分别用于创建安全连接和非安全连接:
1 2 3 4 5 6 7
| func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) { return dial(ctx, false, opts...) }
func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) { return dial(ctx, true, opts...) }
|
两者都委托给内部的 dial 函数,区别仅在于 insecure 参数——控制是否使用 TLS。
clientOptions:配置项详解
gRPC Client 的配置结构体(transport/grpc/client.go:136-151):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| type clientOptions struct { endpoint string subsetSize int tlsConf *tls.Config timeout time.Duration discovery registry.Discovery middleware []middleware.Middleware streamMiddleware []middleware.Middleware ints []grpc.UnaryClientInterceptor streamInts []grpc.StreamClientInterceptor grpcOpts []grpc.DialOption balancerName string filters []selector.NodeFilter healthCheckConfig string printDiscoveryDebugLog bool }
|
默认值(dial 函数中设置):
| 配置项 |
默认值 |
说明 |
timeout |
2000ms |
每次 RPC 调用的超时 |
balancerName |
"selector" |
使用 kratos 自定义的 selector 均衡器 |
subsetSize |
25 |
服务发现子集大小 |
healthCheckConfig |
,"healthCheckConfig":{"serviceName":""} |
启用 gRPC 健康检查 |
printDiscoveryDebugLog |
true |
开启服务发现日志 |
dial 函数:核心创建逻辑
dial 函数(transport/grpc/client.go:163-215)是真正的工厂方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) { options := clientOptions{ timeout: 2000 * time.Millisecond, balancerName: balancerName, subsetSize: 25, printDiscoveryDebugLog: true, healthCheckConfig: `,"healthCheckConfig":{"serviceName":""}`, } for _, o := range opts { o(&options) }
ints := []grpc.UnaryClientInterceptor{ unaryClientInterceptor(options.middleware, options.timeout, options.filters), } sints := []grpc.StreamClientInterceptor{ streamClientInterceptor(options.streamMiddleware, options.filters), }
if len(options.ints) > 0 { ints = append(ints, options.ints...) } if len(options.streamInts) > 0 { sints = append(sints, options.streamInts...) }
grpcOpts := []grpc.DialOption{ grpc.WithDefaultServiceConfig(fmt.Sprintf( `{"loadBalancingConfig": [{"%s":{}}]%s}`, options.balancerName, options.healthCheckConfig)), grpc.WithChainUnaryInterceptor(ints...), grpc.WithChainStreamInterceptor(sints...), }
if options.discovery != nil { grpcOpts = append(grpcOpts, grpc.WithResolvers(discovery.NewBuilder(options.discovery, ...))) }
if insecure { grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials())) } if options.tlsConf != nil { grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf))) }
if len(options.grpcOpts) > 0 { grpcOpts = append(grpcOpts, options.grpcOpts...) }
return grpc.DialContext(ctx, options.endpoint, grpcOpts...) }
|
关键点:
- kratos 桥接拦截器始终在最前面:保证 kratos 中间件链在 gRPC 原生拦截器之前执行
- 服务发现通过 Resolver 注入:如果配置了
discovery,会通过 grpc.WithResolvers 注入 discovery resolver,由它负责动态解析服务地址
- 负载均衡策略通过 Service Config 指定:通过 JSON 配置
"loadBalancingConfig": [{"selector":{}}] 告诉 gRPC 使用 kratos 自定义的 selector 均衡器
客户端拦截器桥接
与 Server 端类似,Client 端也需要桥接层将 kratos 中间件适配为 gRPC 拦截器。客户端桥接需要额外处理几个问题:
- Transport 上下文注入:在 Context 中注入请求信息(endpoint、operation、header)
- 超时控制:为每次 RPC 调用设置超时
- 请求头传播:将 kratos 的 Header 转换到 gRPC outgoing metadata
- Peer 注入:将负载均衡选中的节点信息注入 Context
unaryClientInterceptor 实现
Unary 客户端的桥接拦截器(transport/grpc/client.go:217-249):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration, filters []selector.NodeFilter) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = transport.NewClientContext(ctx, &Transport{ endpoint: cc.Target(), operation: method, reqHeader: headerCarrier{}, nodeFilters: filters, })
if timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() }
h := func(ctx context.Context, req any) (any, error) { if tr, ok := transport.FromClientContext(ctx); ok { header := tr.RequestHeader() keys := header.Keys() keyvals := make([]string, 0, len(keys)) for _, k := range keys { keyvals = append(keyvals, k, header.Get(k)) } ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...) } return reply, invoker(ctx, method, req, reply, cc, opts...) }
if len(ms) > 0 { h = middleware.Chain(ms...)(h) }
var p selector.Peer ctx = selector.NewPeerContext(ctx, &p)
_, err := h(ctx, req) return err } }
|
执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 每次 RPC 调用: ┌─────────────────────────────────────────────────────┐ │ 1. 创建 Transport(endpoint、operation、filters) │ │ 2. 注入 Transport 到 Context │ │ 3. 设置超时(默认 2s) │ │ 4. 注入 Peer 到 Context(空占位,等 selector 填充) │ │ 5. 执行中间件链(洋葱模型) │ │ ├── middleware1 │ │ ├── middleware2 │ │ └── 最终 handler: │ │ ├── 将 kratos Header 拷贝到 gRPC metadata │ │ └── invoker(ctx, method, req, reply, cc) │ │ ├── gRPC 原生 ClientInterceptor │ │ └── selector Balancer.Pick → 选择节点 │ │ └── peer.Node = 选中的节点 │ └─────────────────────────────────────────────────────┘
|
核心设计之一:kratos Header 到 gRPC Metadata 的转换发生在中间件链的最内层。也就是说,中间件通过 tr.RequestHeader().Set("key", "val") 设置的头,会在最终调用 invoker 之前被拷贝到 gRPC outgoing metadata 中。这样中间件可以像操作 HTTP 头一样操作 gRPC metadata。
与 Server 端拦截器的对比
| 维度 |
Server 端 |
Client 端 |
| Context 合并 |
ic.Merge(ctx, baseCtx) |
无需合并,只注入 Transport |
| 头处理 |
从 gRPC incoming metadata 读取请求头 |
将 kratos Header 写入 gRPC outgoing metadata |
| 响应头 |
中间件设置响应头后,写回 gRPC SetHeader |
Client 端 replyHeader 在 Transport 中为空,不使用 |
| 超时 |
使用 Server 配置的超时 |
使用 Client 配置的超时 |
| Peer |
不需要 |
通过 NewPeerContext 注入,由 selector 填充选中节点 |
streamClientInterceptor 实现
Stream 客户端拦截器(transport/grpc/client.go:299-333):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| func streamClientInterceptor(ms []middleware.Middleware, filters []selector.NodeFilter) grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx = transport.NewClientContext(ctx, &Transport{ endpoint: cc.Target(), operation: method, reqHeader: headerCarrier{}, nodeFilters: filters, }) var p selector.Peer ctx = selector.NewPeerContext(ctx, &p)
clientStream, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err }
m := matcher.New() if len(ms) > 0 { m.Use(ms...) h := func(_ context.Context, _ any) (any, error) { return streamer, nil } middleware.Chain(ms...)(h) }
wrappedStream := &wrappedClientStream{ ClientStream: clientStream, ctx: ctx, middleware: m, } return wrappedStream, nil } }
|
与 Server 端的 wrappedStream 类似,Client 端有 wrappedClientStream,它为每次 SendMsg 和 RecvMsg 应用中间件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type wrappedClientStream struct { grpc.ClientStream ctx context.Context middleware matcher.Matcher }
func (w *wrappedClientStream) SendMsg(m any) error { h := func(_ context.Context, req any) (any, error) { return req, w.ClientStream.SendMsg(m) } info, ok := transport.FromClientContext(w.ctx) if !ok { return fmt.Errorf("transport value stored in ctx returns: %v", ok) } if next := w.middleware.Match(info.Operation()); len(next) > 0 { h = middleware.Chain(next...)(h) } _, err := h(w.ctx, m) return err }
|
服务发现
在 gRPC 中,Resolver(解析器) 是实现服务发现(Service Discovery)的核心组件。它的主要职责是:将用户提供的目标字符串(Target URI)解析为一个或多个实际的后端服务器地址列表(IP:Port),并传递给负载均衡器(Load Balancer)。
gRPC Resolver
在微服务架构中,服务实例是动态缩容、扩容或漂移的。gRPC 引入 Resolver 后,客户端只需要关注一个抽象的服务名(例如 dns:///my-service 或 consul:///[ns]/my-service),由 Resolver 去实时获取底层的真实 IP 列表。
gRPC 的客户端连接生命周期中,Resolver 处于非常靠前的位置。其核心流程如下:
- 初始化 Channel:客户端创建 Channel 时传入一个 Target URI(如 dns:///service.domain:8080)
- 匹配 Scheme:gRPC 根据 URI 的 Scheme(如 dns、passthrough 等)从注册表(Registry)中找到对应的 ResolverFactory
- 创建 Resolver 实例:工厂实例化出一个具体的 Resolver,并把 Target 传给它
- 解析与通知:
- Resolver 异步向服务发现中心(如 DNS、Consul、ETCD)查询地址
- 解析成功后,调用 ClientConn.UpdateState() 方法,将包含 IP 列表(Address List) 和 服务配置(Service Config) 的状态更新通知给 gRPC 框架
- 移交负载均衡:gRPC 框架收到地址列表后,将其交给负载均衡器(Load Balancer,如 Round Robin),由 LB 决定具体把请求发给哪个 IP
gRPC 对 Resolver 的设计非常抽象,主要由两个核心接口组成:Builder(工厂)和 Resolver(执行器)。
- 用于判断是否支持某种 Scheme,并构建具体的 Resolver
1 2 3 4 5 6
| type Builder interface { Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) Scheme() string }
|
1 2 3 4 5 6
| type Resolver interface { ResolveNow(ResolveNowOptions) Close() }
|
Resolver 解析出结果后,不能直接返回给一个函数,而是要通过回调 ClientConn 接口将数据推回给 gRPC 内部:
1 2 3 4 5 6
| type ClientConn interface { UpdateState(State) error ReportError(error) }
|
kratos 服务发现
kratos 通过 gRPC 的 Resolver 扩展机制接入服务发现。配置了 discovery 后,Client 会使用 discovery:///service-name 格式的 endpoint,gRPC 调用 discovery Resolver 来解析实际的服务地址。
kratos 通过 grpc.WithResolvers 注入 discovery builder 时,并没有显式注册 scheme,但 discovery builder 的 Scheme() 返回 "discovery",gRPC 会根据 endpoint 的 scheme(如 discovery:///xxx)自动匹配。
Discovery Resolver:动态服务发现
Discovery Resolver 是客户端最复杂的组件之一,负责持续监听服务注册中心的变化,将最新的服务实例列表同步给 gRPC。
Builder 创建
Discovery Resolver 的 Builder 在 client.go 的 dial 中构建:
1 2 3 4 5 6 7 8 9 10 11
| if options.discovery != nil { grpcOpts = append(grpcOpts, grpc.WithResolvers( discovery.NewBuilder( options.discovery, discovery.WithInsecure(insecure), discovery.WithTimeout(options.timeout), discovery.WithSubset(options.subsetSize), discovery.PrintDebugLog(options.printDiscoveryDebugLog), ))) }
|
Builder 的核心配置(resolver/discovery/builder.go:58-64):
1 2 3 4 5 6 7
| type builder struct { discoverer registry.Discovery timeout time.Duration insecure bool subsetSize int debugLog bool }
|
Build 流程
When gRPC 需要解析 discovery:///service-name 时,会调用 builder.Build():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { done := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) go func() { w, err := b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/")) watchRes.w = w watchRes.err = err close(done) }()
select { case <-done: err = watchRes.err case <-time.After(b.timeout): err = ErrWatcherCreateTimeout }
r := &discoveryResolver{ w: watchRes.w, cc: cc, ctx: ctx, cancel: cancel, insecure: b.insecure, debugLog: b.debugLog, subsetSize: b.subsetSize, selectorKey: uuid.New().String(), } go r.watch() return r, nil }
|
关键设计:Watcher 创建是带超时的异步操作。这是因为 discoverer.Watch() 可能需要与注册中心通信,默认 10s 超时防止阻塞 gRPC 的连接建立。
Watch 循环与 Update 更新
Resolver 的核心是一个 watch 协程(resolver/discovery/resolver.go:31-49):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (r *discoveryResolver) watch() { for { select { case <-r.ctx.Done(): return default: } ins, err := r.w.Next() if err != nil { if errors.Is(err, context.Canceled) { return } log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err) time.Sleep(time.Second) continue } r.update(ins) } }
|
watch 是一个无限循环,通过 watcher.Next() 阻塞等待服务实例变化。收到新实例列表后调用 update。update 函数的处理流程(resolver/discovery/resolver.go:51-98):
1 2 3 4 5 6 7 8 9
| 接收 ServiceInstance 列表 ├── 1. 遍历实例,提取端点(只取 grpc/grpcs 协议的端点) │ └── endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !insecure)) ├── 2. 去重(同一 endpoint 地址只保留一个) ├── 3. 子集过滤(如果 subsetSize > 0) │ └── subset.Subset(selectorKey, filtered, subsetSize) ├── 4. 构建 resolver.Address 列表 │ └── 将原始 ServiceInstance 存入 Attributes,供 Balancer 后续使用 └── 5. 通过 cc.UpdateState 通知 gRPC 更新地址列表
|
子集过滤(Subset)的作用:在大型集群中,每个客户端不需要连接所有服务实例。通过 subset.Subset 随机选择固定大小的子集,减少每个客户端的连接数,同时通过 selectorKey(UUID)保证同一客户端每次选中相同的子集。
Direct Resolver:静态地址解析
Direct Resolver 支持直连方式,在 init() 中自动注册(resolver/direct/resolver.go:11-13),并通过 client.go 的 blank import 引入(transport/grpc/client.go:24):
1 2
| _ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"
|
Direct builder 的 Build 逻辑非常简单(resolver/direct/builder.go:25-37):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { addrs := make([]resolver.Address, 0) for _, addr := range strings.Split( strings.TrimPrefix(target.URL.Path, "/"), ",") { addrs = append(addrs, resolver.Address{Addr: addr}) } err := cc.UpdateState(resolver.State{Addresses: addrs}) if err != nil { return nil, err } return newDirectResolver(), nil }
|
使用示例:
1
| direct://<authority>/127.0.0.1:9000,127.0.0.2:9000
|
返回的 directResolver 是一个空的 no-op 实现,因为静态地址不需要动态更新。
负载均衡机制
kratos 的负载均衡通过两部分协作完成:
- kratos selector:负责节点选择策略(如 WRR 加权轮询)
- gRPC Balancer:将 kratos selector 适配为 gRPC 的
balancer.Picker
全局 Selector 初始化
在 client.go 的 init() 中,kratos 确保全局 Selector 已初始化(transport/grpc/client.go:27-31):
1 2 3 4 5
| func init() { if selector.GlobalSelector() == nil { selector.SetGlobalSelector(wrr.NewBuilder()) } }
|
默认使用 WRR(Weighted Round Robin,加权轮询)选择器。WRR 类似 Nginx 的平滑加权轮询算法,根据节点权重分配流量。
selector Balancer:桥接 gRPC 和 kratos Selector
kratos 通过实现 gRPC 的 balancer.Builder 和 balancer.Picker 接口,将 kratos selector 桥接到 gRPC(transport/grpc/balancer.go):
1 2 3 4 5 6 7 8 9 10
| func init() { b := base.NewBalancerBuilder( balancerName, &balancerBuilder{ builder: selector.GlobalSelector(), }, base.Config{HealthCheck: true}, ) balancer.Register(b) }
|
balancerBuilder.Build 方法在 gRPC 子连接就绪时被调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) }
nodes := make([]selector.Node, 0, len(info.ReadySCs)) for conn, info := range info.ReadySCs { ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance) nodes = append(nodes, &grpcNode{ Node: selector.NewNode("grpc", info.Address.Addr, ins), subConn: conn, }) }
p := &balancerPicker{ selector: b.builder.Build(), } p.selector.Apply(nodes) return p }
|
这里的 grpcNode(balancer.go:102-105)是关键的数据结构:
1 2 3 4
| type grpcNode struct { selector.Node subConn balancer.SubConn }
|
它同时持有 kratos Node 和 gRPC SubConn,使得 selector 选出的节点能映射回 gRPC 的连接。
Pick:每次 RPC 的选择逻辑
每次 RPC 调用时,gRPC 会调用 balancerPicker.Pick:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| func (p *balancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { var filters []selector.NodeFilter if tr, ok := transport.FromClientContext(info.Ctx); ok { if gtr, ok := tr.(*Transport); ok { filters = gtr.NodeFilters() } }
n, done, err := p.selector.Select(info.Ctx, selector.WithNodeFilter(filters...)) if err != nil { return balancer.PickResult{}, err }
return balancer.PickResult{ SubConn: n.(*grpcNode).subConn, Done: func(di balancer.DoneInfo) { done(info.Ctx, selector.DoneInfo{ Err: di.Err, BytesSent: di.BytesSent, BytesReceived: di.BytesReceived, ReplyMD: Trailer(di.Trailer), }) }, }, nil }
|
Pick 返回的 Done 回调在 RPC 完成后被 gRPC 调用,kratos 将调用结果(错误、收发字节数等)反馈给 WRR selector,用于动态调整节点权重。比如某个节点持续返回错误,WRR 会降低其权重甚至将其剔除。
Selector 选择流程
Default.Select 的完整选择流程(selector/default_selector.go:26-76):
1 2 3 4 5 6 7 8 9
| Select(ctx, opts...) ├── 1. 获取所有节点(atomic.Value 快速读取) ├── 2. 如果有 NodeFilters,执行过滤 │ └── 逐层过滤,每一步都可能缩小候选集 ├── 3. 通过 Balancer.Pick 选择最优节点 │ └── WRR 加权轮询算法 ├── 4. 将选中节点写入 Peer Context │ └── ctx 中的 Peer.Node = 选中的 Node └── 5. 返回 (Node, DoneFunc)
|
NodeFilter 过滤链支持多层过滤,每一层过滤的结果作为下一层的输入。
Peer 上下文与节点信息传播
kratos 设计了一个 Peer 机制(selector/peer.go),让业务代码能获取到实际被调用的节点信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| type Peer struct { Node Node }
func NewPeerContext(ctx context.Context, p *Peer) context.Context { return context.WithValue(ctx, peerKey{}, p) }
func FromPeerContext(ctx context.Context) (p *Peer, ok bool) { p, ok = ctx.Value(peerKey{}).(*Peer) return }
|
工作流程:
unaryClientInterceptor 创建一个空的 Peer{} 并注入 Context
- gRPC 调用链进入
balancerPicker.Pick → selector.Select
Default.Select 选中节点后,通过 FromPeerContext 取出 Peer,将选中节点写入 p.Node
unaryClientInterceptor 在调用结束后可以通过 FromPeerContext(ctx) 获取到实际调用的节点
这个设计巧妙地利用指针和 Context 完成了一次 后门通信——selector 将选择结果写回拦截器预先创建的空 Peer 中。
完整的 RPC 调用链路
结合以上所有组件,一次完整的 Unary RPC 调用的完整链路如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| 用户代码: conn.Invoke(ctx, method, req, reply) │ ├── unaryClientInterceptor (kratos 桥接拦截器) │ ├── 创建 Transport,注入 Context │ ├── 设置超时 (context.WithTimeout) │ ├── 注入空 Peer │ └── 执行 kratos 中间件链 │ ├── 中间件1 (如 recovery) │ ├── 中间件2 (如 logging) │ ├── 中间件3 (如 tracing) │ └── 最终 handler: │ ├── 将 kratos Header 写入 gRPC outgoing metadata │ └── invoker(ctx, method, req, reply, cc) │ │ │ ├── 用户自定义 gRPC 拦截器 │ │ │ └── gRPC 内部处理 │ ├── Resolver: discovery resolver 动态解析地址 │ ├── Balancer: selector.Pick 选择节点 │ │ ├── 提取 NodeFilters │ │ ├── 过滤候选节点 │ │ ├── Balancer.Pick (WRR) │ │ └── 将选中节点写入 Peer.Node │ ├── 建立/复用 gRPC 连接 │ └── 发送请求 → 接收响应 │ │ │ └── RPC 完成后 │ └── Done 回调: 上报结果给 WRR selector │ └── 调整节点权重 │ └── 返回 error 给用户代码
|
小结
kratos gRPC Client 通过三个 gRPC 扩展点将框架能力无缝注入:
| 扩展点 |
实现 |
桥接的 kratos 能力 |
| Interceptor |
unaryClientInterceptor / streamClientInterceptor |
中间件链、超时控制、Transport 上下文、Header 传播、Peer 注入 |
| Resolver |
Discovery Resolver / Direct Resolver |
服务发现、动态地址解析、子集过滤 |
| Balancer |
selector Balancer → kratos Selector |
WRR 加权轮询、NodeFilter 过滤、动态权重调整、RPC 结果反馈 |
通过这些设计,开发者只需标准 gRPC 调用方式即可享受 kratos 的中间件、服务发现和负载均衡能力,无需关心底层适配细节。