0%

Kratos 源码分析(八):gRPC Transport(3)

上一篇文章 我们深入 kratos gRPC Server 的中间件体系、拦截器桥接、Transport 上下文等关键实现,这篇文章我们将目光转向客户端:介绍 kratos gRPC Client 的创建流程、客户端拦截器桥接、服务发现与负载均衡、以及整个 RPC 调用的完整链路。

为什么需要 kratos gRPC Client?

一个自然的疑问:gRPC 本身已经提供了完整的客户端(grpc.Dial*grpc.ClientConn),kratos 为什么还要自己封装一层?直接用原生 gRPC 客户端不行吗?答案是:原生 gRPC 客户端只是一个连接层,缺乏微服务框架所需的周边能力。具体来说,在微服务场景下,一个完整的客户端调用需要解决以下问题:

原生 gRPC 客户端的不足

需求 原生 gRPC 问题
中间件(日志/追踪/鉴权) 只提供 UnaryClientInterceptor 与 HTTP 中间件不互通,需要为 gRPC 和 HTTP 各写一套
服务发现 需要自己实现 resolver.Builder 每种注册中心(Consul/Etcd/Nacos)都要写适配代码
负载均衡 内置 round_robinpick_first 不支持按权重动态调整,没有节点过滤能力
超时控制 需要在每次调用时传 context.WithTimeout 没有统一的默认超时,容易遗漏
请求元数据 通过 metadata.AppendToOutgoingContext 需要手动管理 key-value 的拷贝,容易出错
调用结果反馈 无法根据 RPC 结果动态调整节点权重

这些不是 gRPC 的缺陷,因为 gRPC 的设计目标就是做一个高效的 RPC 框架,不负责服务治理。但这些确实是一个微服务框架应该帮开发者解决的问题。

kratos Client 提供的价值

kratos Client 不是重新实现一个 gRPC 客户端,而是通过 gRPC 的三个扩展点(Interceptor、Resolver、Balancer)将 kratos 框架能力注入,让开发者用标准 *grpc.ClientConn 就能获得:

1
2
3
4
5
6
用原生 gRPC 客户端:
grpc.Dial("127.0.0.1:9000") → 手动管理 metadata → 手动超时 → 手动拦截器

用 kratos Client:
grpc.Dial("discovery:///service-name") → 自动服务发现 → 自动负载均衡
→ 统一中间件(HTTP/gRPC 复用) → 自动超时 → 自动 Header 转换

核心价值体现在四个层面:

  1. 统一中间件体系:同一套 middleware.Middleware 代码在 HTTP Client 和 gRPC Client 中复用,不需要为两种协议各写一遍日志、追踪、鉴权逻辑
  2. 开箱即用的服务发现:只需注入 registry.Discovery 实现,kratos 自动完成地址解析、实例监听、子集过滤
  3. 智能负载均衡:WRR 加权轮询,支持 NodeFilter 节点过滤,根据 RPC 调用结果动态调整权重
  4. 统一的超时与元数据管理:默认 2s 超时,Header 到 gRPC metadata 自动转换,Transport 上下文贯穿整个调用链

下面我们详细看这些能力是如何实现的。

整体架构

kratos gRPC Client 的核心职责是:将 kratos 的中间件体系、服务发现、负载均衡能力无缝集成到 gRPC 原生客户端中。整个客户端架构分为三层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
┌─────────────────────────────────────────────────────────────┐
│ 用户代码层 │
│ grpc.Dial("discovery:///service-name") → *grpc.ClientConn │
│ conn.Invoke(ctx, method, req, reply) │
├─────────────────────────────────────────────────────────────┤
│ kratos 桥接层 │
│ ┌──────────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ ClientInterceptor│ │ Resolver │ │ Balancer │ │
│ │ (中间件链桥接) │ │ (服务发现) │ │ (负载均衡) │ │
│ └──────────────────┘ └──────────────┘ └───────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ gRPC 原生层 │
│ grpc.ClientConn / grpc.UnaryInvoker / grpc.SubConn │
└─────────────────────────────────────────────────────────────┘
  • 用户代码层:开发者通过 Dial/DialInsecure 获得标准 *grpc.ClientConn,后续调用完全符合 gRPC 习惯
  • kratos 桥接层:通过拦截器、Resolver、Balancer 三个 gRPC 扩展点,将 kratos 的中间件、服务发现、负载均衡注入到 gRPC 框架中
  • gRPC 原生层:gRPC 框架本身的连接管理、多路复用、流控等能力

Client 创建流程

入口函数:Dial 和 DialInsecure

kratos 提供了两个公开的 dial 函数(transport/grpc/client.go:154-161),分别用于创建安全连接和非安全连接:

1
2
3
4
5
6
7
func Dial(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, false, opts...)
}

func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, error) {
return dial(ctx, true, opts...)
}

两者都委托给内部的 dial 函数,区别仅在于 insecure 参数——控制是否使用 TLS。

clientOptions:配置项详解

gRPC Client 的配置结构体(transport/grpc/client.go:136-151):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type clientOptions struct {
endpoint string // 目标地址,如 "discovery:///service-name"
subsetSize int // 服务发现子集大小,0 表示禁用
tlsConf *tls.Config // TLS 配置
timeout time.Duration // 每次调用的超时时间
discovery registry.Discovery // 服务发现实现
middleware []middleware.Middleware // kratos Unary 中间件
streamMiddleware []middleware.Middleware // kratos Stream 中间件
ints []grpc.UnaryClientInterceptor // gRPC 原生 Unary 拦截器
streamInts []grpc.StreamClientInterceptor // gRPC 原生 Stream 拦截器
grpcOpts []grpc.DialOption // gRPC 原生 DialOption
balancerName string // 负载均衡器名称,默认 "selector"
filters []selector.NodeFilter // 节点选择过滤器
healthCheckConfig string // 健康检查配置
printDiscoveryDebugLog bool // 是否打印服务发现日志
}

默认值(dial 函数中设置):

配置项 默认值 说明
timeout 2000ms 每次 RPC 调用的超时
balancerName "selector" 使用 kratos 自定义的 selector 均衡器
subsetSize 25 服务发现子集大小
healthCheckConfig ,"healthCheckConfig":{"serviceName":""} 启用 gRPC 健康检查
printDiscoveryDebugLog true 开启服务发现日志

dial 函数:核心创建逻辑

dial 函数(transport/grpc/client.go:163-215)是真正的工厂方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
// 1. 初始化 options 并填充默认值
options := clientOptions{
timeout: 2000 * time.Millisecond,
balancerName: balancerName,
subsetSize: 25,
printDiscoveryDebugLog: true,
healthCheckConfig: `,"healthCheckConfig":{"serviceName":""}`,
}
for _, o := range opts {
o(&options)
}

// 2. 构建 kratos 桥接拦截器(始终在最前面)
ints := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout, options.filters),
}
sints := []grpc.StreamClientInterceptor{
streamClientInterceptor(options.streamMiddleware, options.filters),
}

// 3. 追加用户自定义的 gRPC 原生拦截器
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
if len(options.streamInts) > 0 {
sints = append(sints, options.streamInts...)
}

// 4. 构建 gRPC DialOption
grpcOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(
`{"loadBalancingConfig": [{"%s":{}}]%s}`,
options.balancerName, options.healthCheckConfig)),
grpc.WithChainUnaryInterceptor(ints...),
grpc.WithChainStreamInterceptor(sints...),
}

// 5. 如果配置了服务发现,注入 discovery resolver
if options.discovery != nil {
grpcOpts = append(grpcOpts,
grpc.WithResolvers(discovery.NewBuilder(options.discovery, ...)))
}

// 6. 根据 insecure 和 tlsConf 设置传输凭证
if insecure {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(grpcinsecure.NewCredentials()))
}
if options.tlsConf != nil {
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(options.tlsConf)))
}

// 7. 追加用户自定义的 gRPC 选项
if len(options.grpcOpts) > 0 {
grpcOpts = append(grpcOpts, options.grpcOpts...)
}

// 8. 调用 gRPC 原生 DialContext 创建连接
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}

关键点:

  1. kratos 桥接拦截器始终在最前面:保证 kratos 中间件链在 gRPC 原生拦截器之前执行
  2. 服务发现通过 Resolver 注入:如果配置了 discovery,会通过 grpc.WithResolvers 注入 discovery resolver,由它负责动态解析服务地址
  3. 负载均衡策略通过 Service Config 指定:通过 JSON 配置 "loadBalancingConfig": [{"selector":{}}] 告诉 gRPC 使用 kratos 自定义的 selector 均衡器

客户端拦截器桥接

与 Server 端类似,Client 端也需要桥接层将 kratos 中间件适配为 gRPC 拦截器。客户端桥接需要额外处理几个问题:

  • Transport 上下文注入:在 Context 中注入请求信息(endpoint、operation、header)
  • 超时控制:为每次 RPC 调用设置超时
  • 请求头传播:将 kratos 的 Header 转换到 gRPC outgoing metadata
  • Peer 注入:将负载均衡选中的节点信息注入 Context

unaryClientInterceptor 实现

Unary 客户端的桥接拦截器(transport/grpc/client.go:217-249):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration,
filters []selector.NodeFilter) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

// 1. 创建 Transport 并注入 Context
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
reqHeader: headerCarrier{},
nodeFilters: filters,
})

// 2. 设置超时
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

// 3. 定义最终的 handler:将 kratos header 拷贝到 gRPC metadata
h := func(ctx context.Context, req any) (any, error) {
if tr, ok := transport.FromClientContext(ctx); ok {
header := tr.RequestHeader()
keys := header.Keys()
keyvals := make([]string, 0, len(keys))
for _, k := range keys {
keyvals = append(keyvals, k, header.Get(k))
}
ctx = grpcmd.AppendToOutgoingContext(ctx, keyvals...)
}
return reply, invoker(ctx, method, req, reply, cc, opts...)
}

// 4. 构建 kratos 中间件链
if len(ms) > 0 {
h = middleware.Chain(ms...)(h)
}

// 5. 注入 Peer 用于接收负载均衡结果
var p selector.Peer
ctx = selector.NewPeerContext(ctx, &p)

_, err := h(ctx, req)
return err
}
}

执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
每次 RPC 调用:
┌─────────────────────────────────────────────────────┐
│ 1. 创建 Transport(endpoint、operation、filters) │
│ 2. 注入 Transport 到 Context │
│ 3. 设置超时(默认 2s) │
│ 4. 注入 Peer 到 Context(空占位,等 selector 填充) │
│ 5. 执行中间件链(洋葱模型) │
│ ├── middleware1 │
│ ├── middleware2 │
│ └── 最终 handler: │
│ ├── 将 kratos Header 拷贝到 gRPC metadata │
│ └── invoker(ctx, method, req, reply, cc) │
│ ├── gRPC 原生 ClientInterceptor │
│ └── selector Balancer.Pick → 选择节点 │
│ └── peer.Node = 选中的节点 │
└─────────────────────────────────────────────────────┘

核心设计之一:kratos Header 到 gRPC Metadata 的转换发生在中间件链的最内层。也就是说,中间件通过 tr.RequestHeader().Set("key", "val") 设置的头,会在最终调用 invoker 之前被拷贝到 gRPC outgoing metadata 中。这样中间件可以像操作 HTTP 头一样操作 gRPC metadata。

与 Server 端拦截器的对比

维度 Server 端 Client 端
Context 合并 ic.Merge(ctx, baseCtx) 无需合并,只注入 Transport
头处理 从 gRPC incoming metadata 读取请求头 将 kratos Header 写入 gRPC outgoing metadata
响应头 中间件设置响应头后,写回 gRPC SetHeader Client 端 replyHeader 在 Transport 中为空,不使用
超时 使用 Server 配置的超时 使用 Client 配置的超时
Peer 不需要 通过 NewPeerContext 注入,由 selector 填充选中节点

streamClientInterceptor 实现

Stream 客户端拦截器(transport/grpc/client.go:299-333):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func streamClientInterceptor(ms []middleware.Middleware,
filters []selector.NodeFilter) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {

// 1. 注入 Transport 和 Peer
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
reqHeader: headerCarrier{},
nodeFilters: filters,
})
var p selector.Peer
ctx = selector.NewPeerContext(ctx, &p)

// 2. 创建原始 Stream
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}

// 3. 构建中间件 Matcher
m := matcher.New()
if len(ms) > 0 {
m.Use(ms...)
// 预初始化中间件链
h := func(_ context.Context, _ any) (any, error) {
return streamer, nil
}
middleware.Chain(ms...)(h)
}

// 4. 包装为 wrappedClientStream
wrappedStream := &wrappedClientStream{
ClientStream: clientStream,
ctx: ctx,
middleware: m,
}
return wrappedStream, nil
}
}

与 Server 端的 wrappedStream 类似,Client 端有 wrappedClientStream,它为每次 SendMsgRecvMsg 应用中间件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type wrappedClientStream struct {
grpc.ClientStream
ctx context.Context
middleware matcher.Matcher
}

func (w *wrappedClientStream) SendMsg(m any) error {
h := func(_ context.Context, req any) (any, error) {
return req, w.ClientStream.SendMsg(m)
}
info, ok := transport.FromClientContext(w.ctx)
if !ok {
return fmt.Errorf("transport value stored in ctx returns: %v", ok)
}
if next := w.middleware.Match(info.Operation()); len(next) > 0 {
h = middleware.Chain(next...)(h)
}
_, err := h(w.ctx, m)
return err
}

// RecvMsg 实现同理

服务发现

在 gRPC 中,Resolver(解析器) 是实现服务发现(Service Discovery)的核心组件。它的主要职责是:将用户提供的目标字符串(Target URI)解析为一个或多个实际的后端服务器地址列表(IP:Port),并传递给负载均衡器(Load Balancer)。

gRPC Resolver

在微服务架构中,服务实例是动态缩容、扩容或漂移的。gRPC 引入 Resolver 后,客户端只需要关注一个抽象的服务名(例如 dns:///my-serviceconsul:///[ns]/my-service),由 Resolver 去实时获取底层的真实 IP 列表。

gRPC 的客户端连接生命周期中,Resolver 处于非常靠前的位置。其核心流程如下:

  • 初始化 Channel:客户端创建 Channel 时传入一个 Target URI(如 dns:///service.domain:8080)
  • 匹配 Scheme:gRPC 根据 URI 的 Scheme(如 dns、passthrough 等)从注册表(Registry)中找到对应的 ResolverFactory
  • 创建 Resolver 实例:工厂实例化出一个具体的 Resolver,并把 Target 传给它
  • 解析与通知:
    • Resolver 异步向服务发现中心(如 DNS、Consul、ETCD)查询地址
    • 解析成功后,调用 ClientConn.UpdateState() 方法,将包含 IP 列表(Address List) 和 服务配置(Service Config) 的状态更新通知给 gRPC 框架
  • 移交负载均衡:gRPC 框架收到地址列表后,将其交给负载均衡器(Load Balancer,如 Round Robin),由 LB 决定具体把请求发给哪个 IP

gRPC 对 Resolver 的设计非常抽象,主要由两个核心接口组成:Builder(工厂)和 Resolver(执行器)。

  • 用于判断是否支持某种 Scheme,并构建具体的 Resolver
1
2
3
4
5
6
type Builder interface {
// 构建一个 Resolver 实例
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// 返回该解析器支持的协议名,例如 "dns" 或 "etcd"
Scheme() string
}
  • Resolver:负责实际的解析逻辑
1
2
3
4
5
6
type Resolver interface {
// 当连接遇到错误,或者需要强制刷新时,gRPC 框架会调用此方法让 Resolver 重新解析
ResolveNow(ResolveNowOptions)
// 销毁 Resolver,释放资源(如关闭监听连接、取消 Context 等)
Close()
}

Resolver 解析出结果后,不能直接返回给一个函数,而是要通过回调 ClientConn 接口将数据推回给 gRPC 内部:

1
2
3
4
5
6
type ClientConn interface {
// 更新连接状态,把最新的 IP 列表和配置传给 gRPC
UpdateState(State) error
// 报告解析错误
ReportError(error)
}

kratos 服务发现

kratos 通过 gRPC 的 Resolver 扩展机制接入服务发现。配置了 discovery 后,Client 会使用 discovery:///service-name 格式的 endpoint,gRPC 调用 discovery Resolver 来解析实际的服务地址。

kratos 通过 grpc.WithResolvers 注入 discovery builder 时,并没有显式注册 scheme,但 discovery builder 的 Scheme() 返回 "discovery",gRPC 会根据 endpoint 的 scheme(如 discovery:///xxx)自动匹配。

Discovery Resolver:动态服务发现

Discovery Resolver 是客户端最复杂的组件之一,负责持续监听服务注册中心的变化,将最新的服务实例列表同步给 gRPC

Builder 创建

Discovery Resolver 的 Builder 在 client.godial 中构建:

1
2
3
4
5
6
7
8
9
10
11
if options.discovery != nil {
grpcOpts = append(grpcOpts,
grpc.WithResolvers(
discovery.NewBuilder(
options.discovery,
discovery.WithInsecure(insecure),
discovery.WithTimeout(options.timeout),
discovery.WithSubset(options.subsetSize),
discovery.PrintDebugLog(options.printDiscoveryDebugLog),
)))
}

Builder 的核心配置(resolver/discovery/builder.go:58-64):

1
2
3
4
5
6
7
type builder struct {
discoverer registry.Discovery // 服务发现实现
timeout time.Duration // Watcher 创建超时,默认 10s
insecure bool // 是否使用 grpc(非安全)协议
subsetSize int // 子集大小,默认 25
debugLog bool // 是否打印日志
}

Build 流程

When gRPC 需要解析 discovery:///service-name 时,会调用 builder.Build():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn,
_ resolver.BuildOptions) (resolver.Resolver, error) {
// 1. 异步创建 Watcher,带超时控制
done := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
w, err := b.discoverer.Watch(ctx, strings.TrimPrefix(target.URL.Path, "/"))
watchRes.w = w
watchRes.err = err
close(done)
}()

// 2. 等待 Watcher 创建完成或超时
select {
case <-done:
err = watchRes.err
case <-time.After(b.timeout):
err = ErrWatcherCreateTimeout
}

// 3. 创建 discoveryResolver 并启动 watch 协程
r := &discoveryResolver{
w: watchRes.w,
cc: cc,
ctx: ctx,
cancel: cancel,
insecure: b.insecure,
debugLog: b.debugLog,
subsetSize: b.subsetSize,
selectorKey: uuid.New().String(), // 用于子集选择的一致性 key
}
go r.watch()
return r, nil
}

关键设计:Watcher 创建是带超时的异步操作。这是因为 discoverer.Watch() 可能需要与注册中心通信,默认 10s 超时防止阻塞 gRPC 的连接建立。

Watch 循环与 Update 更新

Resolver 的核心是一个 watch 协程(resolver/discovery/resolver.go:31-49):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (r *discoveryResolver) watch() {
for {
select {
case <-r.ctx.Done():
return
default:
}
ins, err := r.w.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err)
time.Sleep(time.Second) // 错误后退避 1 秒
continue
}
r.update(ins)
}
}

watch 是一个无限循环,通过 watcher.Next() 阻塞等待服务实例变化。收到新实例列表后调用 updateupdate 函数的处理流程(resolver/discovery/resolver.go:51-98):

1
2
3
4
5
6
7
8
9
接收 ServiceInstance 列表
├── 1. 遍历实例,提取端点(只取 grpc/grpcs 协议的端点)
│ └── endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !insecure))
├── 2. 去重(同一 endpoint 地址只保留一个)
├── 3. 子集过滤(如果 subsetSize > 0)
│ └── subset.Subset(selectorKey, filtered, subsetSize)
├── 4. 构建 resolver.Address 列表
│ └── 将原始 ServiceInstance 存入 Attributes,供 Balancer 后续使用
└── 5. 通过 cc.UpdateState 通知 gRPC 更新地址列表

子集过滤(Subset)的作用:在大型集群中,每个客户端不需要连接所有服务实例。通过 subset.Subset 随机选择固定大小的子集,减少每个客户端的连接数,同时通过 selectorKey(UUID)保证同一客户端每次选中相同的子集。

Direct Resolver:静态地址解析

Direct Resolver 支持直连方式,在 init() 中自动注册(resolver/direct/resolver.go:11-13),并通过 client.go 的 blank import 引入(transport/grpc/client.go:24):

1
2
// client.go
_ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"

Direct builder 的 Build 逻辑非常简单(resolver/direct/builder.go:25-37):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (d *directBuilder) Build(target resolver.Target, cc resolver.ClientConn,
_ resolver.BuildOptions) (resolver.Resolver, error) {
// 解析逗号分隔的地址列表
addrs := make([]resolver.Address, 0)
for _, addr := range strings.Split(
strings.TrimPrefix(target.URL.Path, "/"), ",") {
addrs = append(addrs, resolver.Address{Addr: addr})
}
// 直接更新地址到 gRPC
err := cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil {
return nil, err
}
return newDirectResolver(), nil // 空 resolver,不做后续 watch
}

使用示例:

1
direct://<authority>/127.0.0.1:9000,127.0.0.2:9000

返回的 directResolver 是一个空的 no-op 实现,因为静态地址不需要动态更新。

负载均衡机制

kratos 的负载均衡通过两部分协作完成:

  • kratos selector:负责节点选择策略(如 WRR 加权轮询)
  • gRPC Balancer:将 kratos selector 适配为 gRPC 的 balancer.Picker

全局 Selector 初始化

client.goinit() 中,kratos 确保全局 Selector 已初始化(transport/grpc/client.go:27-31):

1
2
3
4
5
func init() {
if selector.GlobalSelector() == nil {
selector.SetGlobalSelector(wrr.NewBuilder())
}
}

默认使用 WRR(Weighted Round Robin,加权轮询)选择器。WRR 类似 Nginx 的平滑加权轮询算法,根据节点权重分配流量。

selector Balancer:桥接 gRPC 和 kratos Selector

kratos 通过实现 gRPC 的 balancer.Builderbalancer.Picker 接口,将 kratos selector 桥接到 gRPC(transport/grpc/balancer.go):

1
2
3
4
5
6
7
8
9
10
func init() {
b := base.NewBalancerBuilder(
balancerName, // "selector"
&balancerBuilder{
builder: selector.GlobalSelector(), // 全局 Selector 的 Builder
},
base.Config{HealthCheck: true}, // 启用健康检查
)
balancer.Register(b)
}

balancerBuilder.Build 方法在 gRPC 子连接就绪时被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (b *balancerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}

// 1. 从 gRPC Ready SubConns 构建 kratos Node 列表
nodes := make([]selector.Node, 0, len(info.ReadySCs))
for conn, info := range info.ReadySCs {
// 从 Address Attributes 中提取 ServiceInstance
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
nodes = append(nodes, &grpcNode{
Node: selector.NewNode("grpc", info.Address.Addr, ins),
subConn: conn, // 保存 gRPC SubConn 引用
})
}

// 2. 创建 kratos Selector 并初始化节点
p := &balancerPicker{
selector: b.builder.Build(),
}
p.selector.Apply(nodes)
return p
}

这里的 grpcNodebalancer.go:102-105)是关键的数据结构:

1
2
3
4
type grpcNode struct {
selector.Node // kratos 的 Node 接口
subConn balancer.SubConn // gRPC 的子连接
}

它同时持有 kratos Node 和 gRPC SubConn,使得 selector 选出的节点能映射回 gRPC 的连接。

Pick:每次 RPC 的选择逻辑

每次 RPC 调用时,gRPC 会调用 balancerPicker.Pick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (p *balancerPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// 1. 从 Context 中提取 NodeFilters(用户在创建 Client 时设置)
var filters []selector.NodeFilter
if tr, ok := transport.FromClientContext(info.Ctx); ok {
if gtr, ok := tr.(*Transport); ok {
filters = gtr.NodeFilters()
}
}

// 2. 通过 kratos Selector 选择节点
n, done, err := p.selector.Select(info.Ctx, selector.WithNodeFilter(filters...))
if err != nil {
return balancer.PickResult{}, err
}

// 3. 返回 PickResult,包含选中的 SubConn 和完成回调
return balancer.PickResult{
SubConn: n.(*grpcNode).subConn, // 获取 gRPC SubConn
Done: func(di balancer.DoneInfo) {
// 上报 RPC 结果给 selector 用于权重调整
done(info.Ctx, selector.DoneInfo{
Err: di.Err,
BytesSent: di.BytesSent,
BytesReceived: di.BytesReceived,
ReplyMD: Trailer(di.Trailer),
})
},
}, nil
}

Pick 返回的 Done 回调在 RPC 完成后被 gRPC 调用,kratos 将调用结果(错误、收发字节数等)反馈给 WRR selector,用于动态调整节点权重。比如某个节点持续返回错误,WRR 会降低其权重甚至将其剔除。

Selector 选择流程

Default.Select 的完整选择流程(selector/default_selector.go:26-76):

1
2
3
4
5
6
7
8
9
Select(ctx, opts...)
├── 1. 获取所有节点(atomic.Value 快速读取)
├── 2. 如果有 NodeFilters,执行过滤
│ └── 逐层过滤,每一步都可能缩小候选集
├── 3. 通过 Balancer.Pick 选择最优节点
│ └── WRR 加权轮询算法
├── 4. 将选中节点写入 Peer Context
│ └── ctx 中的 Peer.Node = 选中的 Node
└── 5. 返回 (Node, DoneFunc)

NodeFilter 过滤链支持多层过滤,每一层过滤的结果作为下一层的输入。

Peer 上下文与节点信息传播

kratos 设计了一个 Peer 机制(selector/peer.go),让业务代码能获取到实际被调用的节点信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Peer struct {
Node Node // 选中的节点
}

// 注入 Peer 到 Context
func NewPeerContext(ctx context.Context, p *Peer) context.Context {
return context.WithValue(ctx, peerKey{}, p)
}

// 从 Context 提取 Peer
func FromPeerContext(ctx context.Context) (p *Peer, ok bool) {
p, ok = ctx.Value(peerKey{}).(*Peer)
return
}

工作流程:

  1. unaryClientInterceptor 创建一个空的 Peer{} 并注入 Context
  2. gRPC 调用链进入 balancerPicker.Pickselector.Select
  3. Default.Select 选中节点后,通过 FromPeerContext 取出 Peer,将选中节点写入 p.Node
  4. unaryClientInterceptor 在调用结束后可以通过 FromPeerContext(ctx) 获取到实际调用的节点

这个设计巧妙地利用指针和 Context 完成了一次 后门通信——selector 将选择结果写回拦截器预先创建的空 Peer 中。

完整的 RPC 调用链路

结合以上所有组件,一次完整的 Unary RPC 调用的完整链路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
用户代码: conn.Invoke(ctx, method, req, reply)

├── unaryClientInterceptor (kratos 桥接拦截器)
│ ├── 创建 Transport,注入 Context
│ ├── 设置超时 (context.WithTimeout)
│ ├── 注入空 Peer
│ └── 执行 kratos 中间件链
│ ├── 中间件1 (如 recovery)
│ ├── 中间件2 (如 logging)
│ ├── 中间件3 (如 tracing)
│ └── 最终 handler:
│ ├── 将 kratos Header 写入 gRPC outgoing metadata
│ └── invoker(ctx, method, req, reply, cc)
│ │
│ ├── 用户自定义 gRPC 拦截器
│ │
│ └── gRPC 内部处理
│ ├── Resolver: discovery resolver 动态解析地址
│ ├── Balancer: selector.Pick 选择节点
│ │ ├── 提取 NodeFilters
│ │ ├── 过滤候选节点
│ │ ├── Balancer.Pick (WRR)
│ │ └── 将选中节点写入 Peer.Node
│ ├── 建立/复用 gRPC 连接
│ └── 发送请求 → 接收响应
│ │
│ └── RPC 完成后
│ └── Done 回调: 上报结果给 WRR selector
│ └── 调整节点权重

└── 返回 error 给用户代码

小结

kratos gRPC Client 通过三个 gRPC 扩展点将框架能力无缝注入:

扩展点 实现 桥接的 kratos 能力
Interceptor unaryClientInterceptor / streamClientInterceptor 中间件链、超时控制、Transport 上下文、Header 传播、Peer 注入
Resolver Discovery Resolver / Direct Resolver 服务发现、动态地址解析、子集过滤
Balancer selector Balancer → kratos Selector WRR 加权轮询、NodeFilter 过滤、动态权重调整、RPC 结果反馈

通过这些设计,开发者只需标准 gRPC 调用方式即可享受 kratos 的中间件、服务发现和负载均衡能力,无需关心底层适配细节。