0%

Kratos 源码分析(五):App 应用生命周期管理实现详解

上一篇文章我们分析了 kratos 的配置模块,了解了配置的加载、合并和热更新机制。这篇文章我们将分析 kratos 的 App 模块,它是整个微服务应用的生命周期管理器,负责服务的启动、停止、注册和注销。

App 模块概览

App 模块(github.com/go-kratos/kratos/v2)是 kratos 微服务应用的核心入口,它承担了以下职责:

  • 生命周期管理:统一管理 Transport Server(HTTP、gRPC)的启动和停止
  • 服务注册与发现:在服务启动时注册,停止时注销
  • 信号处理:监听系统信号(SIGTERM、SIGQUIT、SIGINT),实现优雅停机
  • 生命周期钩子:提供 BeforeStart/AfterStart/BeforeStop/AfterStop 四个钩子
  • 应用信息传播:通过 Context 传递应用元信息(ID、Name、Version 等)

kratos-layout 中的 App 使用

创建 App

在 kratos-layout 创建的项目中,App 的创建位于 cmd/kratos-demo/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import (
"os"

"kratos-demo/internal/conf"

"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
)

var (
Name string
Version string
flagconf string
id, _ = os.Hostname()
)

func newApp(logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App {
return kratos.New(
kratos.ID(id),
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.Server(gs, hs),
)
}

可以看到,App 的创建使用了选项模式,通过 kratos.IDkratos.Namekratos.Server 等函数配置应用。其中:

  • ID:使用主机名作为实例 ID
  • Name / Version:通过编译时 -ldflags 注入
  • Server:注册 HTTP 和 gRPC 两个 Transport Server

运行 App

main 函数中,配置加载完成后,通过 app.Run() 启动应用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
......

// 4. 依赖注入创建 App
app, cleanup, err := wireApp(bc.Server, bc.Data, logger)
if err != nil {
panic(err)
}
defer cleanup()

// 5. 启动并阻塞,等待停止信号
if err := app.Run(); err != nil {
panic(err)
}
}

app.Run() 会阻塞当前 goroutine,直到收到停止信号。

Server 的创建

HTTP 和 gRPC Server 在 internal/server/ 下创建。如下展示了 GRPC server 的创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// internal/server/grpc.go
func NewGRPCServer(c *conf.Server, greeter *service.GreeterService, logger log.Logger) *grpc.Server {
var opts = []grpc.ServerOption{
grpc.Middleware(recovery.Recovery()),
}
if c.Grpc.Addr != "" {
opts = append(opts, grpc.Address(c.Grpc.Addr))
}
if c.Grpc.Timeout != nil {
opts = append(opts, grpc.Timeout(c.Grpc.Timeout.AsDuration()))
}
srv := grpc.NewServer(opts...)
v1.RegisterGreeterServer(srv, greeter)
return srv
}

这些 Server 实现了 transport.Server 接口,可以被 App 统一管理。

使用流程图

如下简单总结了 App 的使用流程:

1
2
3
4
5
main()
├─ 1. 初始化 Logger
├─ 2. 加载配置(config.Load + config.Scan)
├─ 3. Wire 依赖注入 → 创建 Server + App
└─ 4. app.Run() → 阻塞,等待停止信号

核心接口设计

AppInfo 接口

AppInfo 定义了应用元信息的访问接口,通过 Context 传播给下游:

1
2
3
4
5
6
7
8
// app.go
type AppInfo interface {
ID() string
Name() string
Version() string
Metadata() map[string]string
Endpoint() []string
}

transport.Server 接口

1
2
3
4
5
// transport/transport.go
type Server interface {
Start(context.Context) error
Stop(context.Context) error
}

目前有两种实现:http.Servergrpc.Server

transport.Endpointer 接口

Endpointer 用于获取服务的 Endpoint 地址:

1
2
3
4
// transport/transport.go
type Endpointer interface {
Endpoint() (*url.URL, error)
}

http.Servergrpc.Server 都实现了此接口,App 在构建服务实例时会通过此接口自动收集 Endpoint。

registry.Registrar 接口

Registrar 是服务注册器的抽象:

1
2
3
4
5
// registry/registry.go
type Registrar interface {
Register(ctx context.Context, service *ServiceInstance) error
Deregister(ctx context.Context, service *ServiceInstance) error
}

ServiceInstance 表示一个服务实例:

1
2
3
4
5
6
7
8
// registry/registry.go
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Metadata map[string]string `json:"metadata"`
Endpoints []string `json:"endpoints"`
}

接口关系图

1
2
3
4
5
6
7
8
9
10
11
12
13
App (生命周期管理器)
├── 持有 ──→ transport.Server (传输层服务器)
│ ├─ http.Server
│ └─ grpc.Server
│ 两者均实现 Endpointer 接口

├── 持有 ──→ registry.Registrar (服务注册器)
│ └─ Register / Deregister

├── 实现 ──→ AppInfo (应用元信息)
│ └─ ID / Name / Version / Metadata / Endpoint

└── Context 传播 AppInfo 给下游

App 结构体与选项

App 结构体

1
2
3
4
5
6
7
8
// app.go
type App struct {
opts options // 应用选项
ctx context.Context // 可取消的 context
cancel context.CancelFunc // 取消函数
mu sync.Mutex // 保护 instance 的互斥锁
instance *registry.ServiceInstance // 服务实例
}

字段的作用:

  • opts:存储所有配置项,通过选项模式设置
  • ctx / cancel:用于控制所有 Server 的生命周期,调用 cancel() 会触发所有 Server 停止
  • mu:保护 instance 字段的并发访问
  • instance:当前服务实例,用于服务注册和注销

options 结构体

所有选项均采用选项模式,Option 类型定义为:

1
type Option func(o *options)

options 结构体存储所有选项值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// options.go
type options struct {
id string
name string
version string
metadata map[string]string
endpoints []*url.URL

ctx context.Context
sigs []os.Signal

logger log.Logger

registrar registry.Registrar
registrarTimeout time.Duration
stopTimeout time.Duration

servers []transport.Server

beforeStart []func(context.Context) error
beforeStop []func(context.Context) error
afterStart []func(context.Context) error
afterStop []func(context.Context) error
}
选项函数 说明 默认值
ID(id) 服务实例 ID UUID
Name(name) 服务名称
Version(version) 服务版本
Metadata(md) 服务元数据 nil
Endpoint(endpoints...) 服务端点 从 Server 自动获取
Context(ctx) 基础 Context context.Background()
Logger(logger) 日志器 nil
Server(srv...) Transport Server 列表
Signal(sigs...) 监听的系统信号 [SIGTERM, SIGQUIT, SIGINT]
Registrar(r) 服务注册器 nil
RegistrarTimeout(t) 注册超时时间 10s
StopTimeout(t) 停止超时时间 0(无超时)
BeforeStart(fn) 启动前钩子
AfterStart(fn) 启动后钩子
BeforeStop(fn) 停止前钩子
AfterStop(fn) 停止后钩子

创建 App 实例

如下展示了如何创建 App 实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// app.go
func New(opts ...Option) *App {
o := options{
ctx: context.Background(),
sigs: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT},
registrarTimeout: 10 * time.Second,
}
// 自动生成 UUID 作为 ID
if id, err := uuid.NewUUID(); err == nil {
o.id = id.String()
}
// 应用用户选项
for _, opt := range opts {
opt(&o)
}
// 设置全局 Logger
if o.logger != nil {
log.SetLogger(o.logger)
}
// 创建可取消的 Context
ctx, cancel := context.WithCancel(o.ctx)
return &App{
ctx: ctx,
cancel: cancel,
opts: o,
}
}

值得注意的是,如果用户通过 kratos.Logger(logger) 传入了自定义 Logger,New 函数会调用 log.SetLogger(o.logger) 将其设置为全局 Logger。这意味着 kratos 的全局日志函数(log.Infolog.Error 等)都会使用这个 Logger。

Run 方法:启动流程

Run 是 App 最核心的方法,负责启动所有 Server 并阻塞等待停止信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// app.go
func (a *App) Run() error {
// 1. 构建服务实例
instance, err := a.buildInstance()
if err != nil {
return err
}
a.mu.Lock()
a.instance = instance
a.mu.Unlock()

// 2. 创建携带 AppInfo 的 Context
sctx := NewContext(a.ctx, a)

// 3. 创建 errgroup
eg, ctx := errgroup.WithContext(sctx)
wg := sync.WaitGroup{}

// 4. 执行 BeforeStart 钩子
for _, fn := range a.opts.beforeStart {
if err = fn(sctx); err != nil {
return err
}
}

// 5. 启动所有 Server
octx := NewContext(a.opts.ctx, a)
for _, srv := range a.opts.servers {
server := srv
// goroutine: 等待 ctx 取消后停止 Server
eg.Go(func() error {
<-ctx.Done()
stopCtx := context.WithoutCancel(octx)
if a.opts.stopTimeout > 0 {
var cancel context.CancelFunc
stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout)
defer cancel()
}
return server.Stop(stopCtx)
})
wg.Add(1)
// goroutine: 启动 Server
eg.Go(func() error {
wg.Done()
return server.Start(octx)
})
}
wg.Wait()

// 6. 服务注册
if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel()
if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err
}
}

// 7. 执行 AfterStart 钩子
for _, fn := range a.opts.afterStart {
if err = fn(sctx); err != nil {
return err
}
}

// 8. 监听系统信号
c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...)
eg.Go(func() error {
select {
case <-ctx.Done():
return nil
case <-c:
return a.Stop()
}
})

// 9. 等待所有 goroutine 完成
if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
err = nil

// 10. 执行 AfterStop 钩子
for _, fn := range a.opts.afterStop {
err = fn(sctx)
}
return err
}

如下总结了 Run 的核心流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Run()
├─ 1. buildInstance() 构建服务实例
├─ 2. NewContext() 创建携带 AppInfo 的 Context
├─ 3. errgroup.WithContext() 创建并发控制组
├─ 4. BeforeStart 钩子 启动前回调
├─ 5. 启动所有 Server 每个 Server 两个 goroutine
│ ├─ goroutine A: Start(octx) 启动服务
│ └─ goroutine B: <-ctx.Done() → Stop(stopCtx) 等待停止信号后优雅关闭
├─ 6. wg.Wait() 确保所有 Server 已开始启动
├─ 7. Registrar.Register() 服务注册
├─ 8. AfterStart 钩子 启动后回调
├─ 9. 监听系统信号 SIGTERM/SIGQUIT/SIGINT
├─ 10. eg.Wait() 等待所有 goroutine 完成
└─ 11. AfterStop 钩子 停止后回调

关键设计:Server 启动与停止的并发控制

对每个 Server,Run 启动了两个 goroutine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// goroutine 1: 等待 ctx 取消后停止 Server
eg.Go(func() error {
<-ctx.Done() // 阻塞等待取消信号
stopCtx := context.WithoutCancel(octx) // 即使父 ctx 取消,stopCtx 仍有效
if a.opts.stopTimeout > 0 {
var cancel context.CancelFunc
stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout)
defer cancel()
}
return server.Stop(stopCtx) // 优雅停止
})

// goroutine 2: 启动 Server
wg.Add(1)
eg.Go(func() error {
wg.Done() // 标记已开始启动
return server.Start(octx) // 启动服务(通常阻塞)
})
wg.Wait() // 等待所有 Server 的 wg.Done() 执行

这里有几个精妙的设计:

1. wg.Wait() 确保注册在启动之后

wg.Wait() 会阻塞到所有 Server 的 wg.Done() 执行完毕,即所有 Server 都已进入 Start 方法。这保证了服务注册(步骤 7)一定发生在 Server 启动之后,避免出现 服务已注册但 Server 还没启动 的窗口期。

2. context.WithoutCancel 保证停止上下文有效

Stop() 被调用时,cancel() 会导致 ctx.Done() 触发。但在停止 Server 时,server.Stop(stopCtx) 仍然需要一个有效的 Context 来执行清理逻辑(如等待正在处理的请求完成)。context.WithoutCancel(octx) 创建了一个不会被父 Context 取消影响的 Context,确保停止逻辑可以正常执行。

3. stopTimeout 控制优雅停机时限

如果设置了 stopTimeout,停止操作会在超时后强制终止,避免因某些请求处理时间过长而无限期阻塞。

errgroup 的作用

errgroup.WithContextgolang.org/x/sync/errgroup 提供的并发控制工具,可以看作 sync.WaitGroup 的增强版:

  • 错误传播:任何一个 goroutine 返回错误,eg.Wait() 都会返回该错误
  • 自动取消:当一个 goroutine 出错时,通过 ctx 通知其他 goroutine 退出
  • 简化代码:无需手动管理 Add/Done/Wait 计数器

在 App 中,errgroup 管理了三类 goroutine:

  1. 每个 Server 的启动 goroutine
  2. 每个 Server 的停止 goroutine
  3. 信号监听 goroutine

任何一方出错,都会触发其他 goroutine 退出。

Stop 方法:优雅停机

如下展示了 Stop 方法的核心实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// app.go
func (a *App) Stop() (err error) {
sctx := NewContext(a.ctx, a)

// 1. 执行 BeforeStop 钩子
for _, fn := range a.opts.beforeStop {
err = fn(sctx)
}

// 2. 服务注销
a.mu.Lock()
instance := a.instance
a.mu.Unlock()
if a.opts.registrar != nil && instance != nil {
ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout)
defer cancel()
if err = a.opts.registrar.Deregister(ctx, instance); err != nil {
return err
}
}

// 3. 取消 Context(触发所有 Server 停止)
if a.cancel != nil {
a.cancel()
}

return err
}

整个停机流程图可以简化为:

1
2
3
4
5
6
Stop()
├─ 1. BeforeStop 钩子 停止前回调
├─ 2. Registrar.Deregister() 服务注销(先注销,再停止)
└─ 3. cancel() 取消 Context
└─ 触发所有 Server 的 <-ctx.Done()
└─ 调用 server.Stop(stopCtx) 优雅停止

Stop() 先执行 Deregister,再调用 cancel() 触发 Server 停止。这个顺序很重要:先从注册中心注销,使得新的请求不再路由到本实例,然后再优雅地处理完已有的请求后停止。如果反过来,先停止 Server 再注销,会导致注册中心仍然认为本实例可用,但实际已经无法处理请求。

如下则展示了 App 的完整生命周期时序:

1
2
3
4
5
6
7
8
9
10
11
12
13
                  ┌─────────── Run() ───────────┐
│ │
BeforeStart ──→ Start Servers ──→ Register ──→ AfterStart

等待信号/Stop()

BeforeStop

Deregister

cancel() ──→ Stop Servers

AfterStop

部分实现细节

接下来我们再来深入分析其中一些重点流程的实现细节:

构建服务实例

buildInstance 方法构建用于注册的 ServiceInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// app.go
func (a *App) buildInstance() (*registry.ServiceInstance, error) {
endpoints := make([]string, 0, len(a.opts.endpoints))
// 1. 使用用户指定的 Endpoint
for _, e := range a.opts.endpoints {
endpoints = append(endpoints, e.String())
}
// 2. 如果没有指定,从 Server 自动获取
if len(endpoints) == 0 {
for _, srv := range a.opts.servers {
if r, ok := srv.(transport.Endpointer); ok {
e, err := r.Endpoint()
if err != nil {
return nil, err
}
endpoints = append(endpoints, e.String())
}
}
}
// 3. 创建服务实例
return &registry.ServiceInstance{
ID: a.opts.id,
Name: a.opts.name,
Version: a.opts.version,
Metadata: a.opts.metadata,
Endpoints: endpoints,
}, nil
}

Endpoint 的获取有两个来源:

  1. 用户显式指定:通过 kratos.Endpoint(url) 选项设置
  2. 从 Server 自动获取:如果 Server 实现了 transport.Endpointer 接口,自动调用 Endpoint() 方法

Context 传播 AppInfo

App 通过 Context 将自身信息传播给下游:

1
2
3
4
5
6
7
8
9
10
11
/ app.go
type appKey struct{}

func NewContext(ctx context.Context, s AppInfo) context.Context {
return context.WithValue(ctx, appKey{}, s)
}

func FromContext(ctx context.Context) (s AppInfo, ok bool) {
s, ok = ctx.Value(appKey{}).(AppInfo)
return
}

appKey 是一个空结构体类型,用作 Context 的 key。这是 Go 中标准的"私有键"模式:

  • 类型安全appKey 是包内私有类型,外部包无法创建相同类型的 key,避免 key 冲突
  • 零内存占用struct{} 不占用内存空间
  • 值相等:两个 appKey{} 实例在 == 比较时是相等的,满足 context.Value 的 key 查找要求

使用示例:

1
2
3
4
5
6
// 在中间件或 Handler 中获取应用信息
func someHandler(ctx context.Context) {
if appInfo, ok := kratos.FromContext(ctx); ok {
fmt.Printf("Service: %s, ID: %s\n", appInfo.Name(), appInfo.ID())
}
}

小结

这篇文章我们分析了 Kratos 应用框架的核心组件 App 的实现,包括其生命周期管理、服务注册注销以及优雅停机等关键功能:

  • App 将多个 Transport Server 的启停逻辑统一管理,应用层只需关注 Run()Stop()
  • 通过 Option 函数灵活配置 App,与 config 模块保持一致的设计风格
  • 利用 context.WithValue 传播 AppInfo,下游可按需获取

通过深入理解这些内部机制,我们可以更好地掌握如何构建和管理自己的微服务应用。