上一篇文章 我们分析了 kratos 的配置模块,了解了配置的加载、合并和热更新机制。这篇文章我们将分析 kratos 的 App 模块,它是整个微服务应用的生命周期管理器,负责服务的启动、停止、注册和注销。
App 模块概览
App 模块(github.com/go-kratos/kratos/v2)是 kratos 微服务应用的核心入口,它承担了以下职责:
生命周期管理 :统一管理 Transport Server(HTTP、gRPC)的启动和停止
服务注册与发现 :在服务启动时注册,停止时注销
信号处理 :监听系统信号(SIGTERM、SIGQUIT、SIGINT),实现优雅停机
生命周期钩子 :提供 BeforeStart/AfterStart/BeforeStop/AfterStop 四个钩子
应用信息传播 :通过 Context 传递应用元信息(ID、Name、Version 等)
kratos-layout 中的 App 使用
创建 App
在 kratos-layout 创建的项目中,App 的创建位于 cmd/kratos-demo/main.go:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import ( "os" "kratos-demo/internal/conf" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config/file" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/middleware/tracing" "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/go-kratos/kratos/v2/transport/http" ) var ( Name string Version string flagconf string id, _ = os.Hostname() ) func newApp (logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App { return kratos.New( kratos.ID(id), kratos.Name(Name), kratos.Version(Version), kratos.Metadata(map [string ]string {}), kratos.Logger(logger), kratos.Server(gs, hs), ) }
可以看到,App 的创建使用了选项模式,通过 kratos.ID、kratos.Name、kratos.Server 等函数配置应用。其中:
ID:使用主机名作为实例 ID
Name / Version:通过编译时 -ldflags 注入
Server:注册 HTTP 和 gRPC 两个 Transport Server
运行 App
在 main 函数中,配置加载完成后,通过 app.Run() 启动应用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func main () { ...... app, cleanup, err := wireApp(bc.Server, bc.Data, logger) if err != nil { panic (err) } defer cleanup() if err := app.Run(); err != nil { panic (err) } }
app.Run() 会阻塞当前 goroutine,直到收到停止信号。
Server 的创建
HTTP 和 gRPC Server 在 internal/server/ 下创建。如下展示了 GRPC server 的创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func NewGRPCServer (c *conf.Server, greeter *service.GreeterService, logger log.Logger) *grpc.Server { var opts = []grpc.ServerOption{ grpc.Middleware(recovery.Recovery()), } if c.Grpc.Addr != "" { opts = append (opts, grpc.Address(c.Grpc.Addr)) } if c.Grpc.Timeout != nil { opts = append (opts, grpc.Timeout(c.Grpc.Timeout.AsDuration())) } srv := grpc.NewServer(opts...) v1.RegisterGreeterServer(srv, greeter) return srv }
这些 Server 实现了 transport.Server 接口,可以被 App 统一管理。
使用流程图
如下简单总结了 App 的使用流程:
1 2 3 4 5 main() ├─ 1. 初始化 Logger ├─ 2. 加载配置(config.Load + config.Scan) ├─ 3. Wire 依赖注入 → 创建 Server + App └─ 4. app.Run() → 阻塞,等待停止信号
核心接口设计
AppInfo 接口
AppInfo 定义了应用元信息的访问接口,通过 Context 传播给下游:
1 2 3 4 5 6 7 8 type AppInfo interface { ID() string Name() string Version() string Metadata() map [string ]string Endpoint() []string }
transport.Server 接口
1 2 3 4 5 type Server interface { Start(context.Context) error Stop(context.Context) error }
目前有两种实现:http.Server 和 grpc.Server。
transport.Endpointer 接口
Endpointer 用于获取服务的 Endpoint 地址:
1 2 3 4 type Endpointer interface { Endpoint() (*url.URL, error ) }
http.Server 和 grpc.Server 都实现了此接口,App 在构建服务实例时会通过此接口自动收集 Endpoint。
registry.Registrar 接口
Registrar 是服务注册器的抽象:
1 2 3 4 5 type Registrar interface { Register(ctx context.Context, service *ServiceInstance) error Deregister(ctx context.Context, service *ServiceInstance) error }
ServiceInstance 表示一个服务实例:
1 2 3 4 5 6 7 8 type ServiceInstance struct { ID string `json:"id"` Name string `json:"name"` Version string `json:"version"` Metadata map [string ]string `json:"metadata"` Endpoints []string `json:"endpoints"` }
接口关系图
1 2 3 4 5 6 7 8 9 10 11 12 13 App (生命周期管理器) ├── 持有 ──→ transport.Server (传输层服务器) │ ├─ http.Server │ └─ grpc.Server │ 两者均实现 Endpointer 接口 │ ├── 持有 ──→ registry.Registrar (服务注册器) │ └─ Register / Deregister │ ├── 实现 ──→ AppInfo (应用元信息) │ └─ ID / Name / Version / Metadata / Endpoint │ └── Context 传播 AppInfo 给下游
App 结构体与选项
App 结构体
1 2 3 4 5 6 7 8 type App struct { opts options ctx context.Context cancel context.CancelFunc mu sync.Mutex instance *registry.ServiceInstance }
字段的作用:
opts:存储所有配置项,通过选项模式设置
ctx / cancel:用于控制所有 Server 的生命周期,调用 cancel() 会触发所有 Server 停止
mu:保护 instance 字段的并发访问
instance:当前服务实例,用于服务注册和注销
options 结构体
所有选项均采用选项模式,Option 类型定义为:
1 type Option func (o *options)
options 结构体存储所有选项值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 type options struct { id string name string version string metadata map [string ]string endpoints []*url.URL ctx context.Context sigs []os.Signal logger log.Logger registrar registry.Registrar registrarTimeout time.Duration stopTimeout time.Duration servers []transport.Server beforeStart []func (context.Context) error beforeStop []func (context.Context) error afterStart []func (context.Context) error afterStop []func (context.Context) error }
选项函数
说明
默认值
ID(id)
服务实例 ID
UUID
Name(name)
服务名称
空
Version(version)
服务版本
空
Metadata(md)
服务元数据
nil
Endpoint(endpoints...)
服务端点
从 Server 自动获取
Context(ctx)
基础 Context
context.Background()
Logger(logger)
日志器
nil
Server(srv...)
Transport Server 列表
空
Signal(sigs...)
监听的系统信号
[SIGTERM, SIGQUIT, SIGINT]
Registrar(r)
服务注册器
nil
RegistrarTimeout(t)
注册超时时间
10s
StopTimeout(t)
停止超时时间
0(无超时)
BeforeStart(fn)
启动前钩子
空
AfterStart(fn)
启动后钩子
空
BeforeStop(fn)
停止前钩子
空
AfterStop(fn)
停止后钩子
空
创建 App 实例
如下展示了如何创建 App 实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func New (opts ...Option) *App { o := options{ ctx: context.Background(), sigs: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT}, registrarTimeout: 10 * time.Second, } if id, err := uuid.NewUUID(); err == nil { o.id = id.String() } for _, opt := range opts { opt(&o) } if o.logger != nil { log.SetLogger(o.logger) } ctx, cancel := context.WithCancel(o.ctx) return &App{ ctx: ctx, cancel: cancel, opts: o, } }
值得注意的是,如果用户通过 kratos.Logger(logger) 传入了自定义 Logger,New 函数会调用 log.SetLogger(o.logger) 将其设置为全局 Logger。这意味着 kratos 的全局日志函数(log.Info、log.Error 等)都会使用这个 Logger。
Run 方法:启动流程
Run 是 App 最核心的方法,负责启动所有 Server 并阻塞等待停止信号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 func (a *App) Run() error { instance, err := a.buildInstance() if err != nil { return err } a.mu.Lock() a.instance = instance a.mu.Unlock() sctx := NewContext(a.ctx, a) eg, ctx := errgroup.WithContext(sctx) wg := sync.WaitGroup{} for _, fn := range a.opts.beforeStart { if err = fn(sctx); err != nil { return err } } octx := NewContext(a.opts.ctx, a) for _, srv := range a.opts.servers { server := srv eg.Go(func () error { <-ctx.Done() stopCtx := context.WithoutCancel(octx) if a.opts.stopTimeout > 0 { var cancel context.CancelFunc stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout) defer cancel() } return server.Stop(stopCtx) }) wg.Add(1 ) eg.Go(func () error { wg.Done() return server.Start(octx) }) } wg.Wait() if a.opts.registrar != nil { rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout) defer rcancel() if err = a.opts.registrar.Register(rctx, instance); err != nil { return err } } for _, fn := range a.opts.afterStart { if err = fn(sctx); err != nil { return err } } c := make (chan os.Signal, 1 ) signal.Notify(c, a.opts.sigs...) eg.Go(func () error { select { case <-ctx.Done(): return nil case <-c: return a.Stop() } }) if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { return err } err = nil for _, fn := range a.opts.afterStop { err = fn(sctx) } return err }
如下总结了 Run 的核心流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Run() ├─ 1. buildInstance() 构建服务实例 ├─ 2. NewContext() 创建携带 AppInfo 的 Context ├─ 3. errgroup.WithContext() 创建并发控制组 ├─ 4. BeforeStart 钩子 启动前回调 ├─ 5. 启动所有 Server 每个 Server 两个 goroutine │ ├─ goroutine A: Start(octx) 启动服务 │ └─ goroutine B: <-ctx.Done() → Stop(stopCtx) 等待停止信号后优雅关闭 ├─ 6. wg.Wait() 确保所有 Server 已开始启动 ├─ 7. Registrar.Register() 服务注册 ├─ 8. AfterStart 钩子 启动后回调 ├─ 9. 监听系统信号 SIGTERM/SIGQUIT/SIGINT ├─ 10. eg.Wait() 等待所有 goroutine 完成 └─ 11. AfterStop 钩子 停止后回调
关键设计:Server 启动与停止的并发控制
对每个 Server,Run 启动了两个 goroutine:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 eg.Go(func () error { <-ctx.Done() stopCtx := context.WithoutCancel(octx) if a.opts.stopTimeout > 0 { var cancel context.CancelFunc stopCtx, cancel = context.WithTimeout(stopCtx, a.opts.stopTimeout) defer cancel() } return server.Stop(stopCtx) }) wg.Add(1 ) eg.Go(func () error { wg.Done() return server.Start(octx) }) wg.Wait()
这里有几个精妙的设计:
1. wg.Wait() 确保注册在启动之后
wg.Wait() 会阻塞到所有 Server 的 wg.Done() 执行完毕,即所有 Server 都已进入 Start 方法。这保证了服务注册(步骤 7)一定发生在 Server 启动之后,避免出现 服务已注册但 Server 还没启动 的窗口期。
2. context.WithoutCancel 保证停止上下文有效
当 Stop() 被调用时,cancel() 会导致 ctx.Done() 触发。但在停止 Server 时,server.Stop(stopCtx) 仍然需要一个有效的 Context 来执行清理逻辑(如等待正在处理的请求完成)。context.WithoutCancel(octx) 创建了一个不会被父 Context 取消影响的 Context,确保停止逻辑可以正常执行。
3. stopTimeout 控制优雅停机时限
如果设置了 stopTimeout,停止操作会在超时后强制终止,避免因某些请求处理时间过长而无限期阻塞。
errgroup 的作用
errgroup.WithContext 是 golang.org/x/sync/errgroup 提供的并发控制工具,可以看作 sync.WaitGroup 的增强版:
错误传播 :任何一个 goroutine 返回错误,eg.Wait() 都会返回该错误
自动取消 :当一个 goroutine 出错时,通过 ctx 通知其他 goroutine 退出
简化代码 :无需手动管理 Add/Done/Wait 计数器
在 App 中,errgroup 管理了三类 goroutine:
每个 Server 的启动 goroutine
每个 Server 的停止 goroutine
信号监听 goroutine
任何一方出错,都会触发其他 goroutine 退出。
Stop 方法:优雅停机
如下展示了 Stop 方法的核心实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (a *App) Stop() (err error ) { sctx := NewContext(a.ctx, a) for _, fn := range a.opts.beforeStop { err = fn(sctx) } a.mu.Lock() instance := a.instance a.mu.Unlock() if a.opts.registrar != nil && instance != nil { ctx, cancel := context.WithTimeout(NewContext(a.ctx, a), a.opts.registrarTimeout) defer cancel() if err = a.opts.registrar.Deregister(ctx, instance); err != nil { return err } } if a.cancel != nil { a.cancel() } return err }
整个停机流程图可以简化为:
1 2 3 4 5 6 Stop() ├─ 1. BeforeStop 钩子 停止前回调 ├─ 2. Registrar.Deregister() 服务注销(先注销,再停止) └─ 3. cancel() 取消 Context └─ 触发所有 Server 的 <-ctx.Done() └─ 调用 server.Stop(stopCtx) 优雅停止
Stop() 先执行 Deregister,再调用 cancel() 触发 Server 停止。这个顺序很重要:先从注册中心注销,使得新的请求不再路由到本实例,然后再优雅地处理完已有的请求后停止。如果反过来,先停止 Server 再注销,会导致注册中心仍然认为本实例可用,但实际已经无法处理请求。
如下则展示了 App 的完整生命周期时序:
1 2 3 4 5 6 7 8 9 10 11 12 13 ┌─────────── Run() ───────────┐ │ │ BeforeStart ──→ Start Servers ──→ Register ──→ AfterStart │ 等待信号/Stop() │ BeforeStop │ Deregister │ cancel() ──→ Stop Servers │ AfterStop
部分实现细节
接下来我们再来深入分析其中一些重点流程的实现细节:
构建服务实例
buildInstance 方法构建用于注册的 ServiceInstance:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (a *App) buildInstance() (*registry.ServiceInstance, error ) { endpoints := make ([]string , 0 , len (a.opts.endpoints)) for _, e := range a.opts.endpoints { endpoints = append (endpoints, e.String()) } if len (endpoints) == 0 { for _, srv := range a.opts.servers { if r, ok := srv.(transport.Endpointer); ok { e, err := r.Endpoint() if err != nil { return nil , err } endpoints = append (endpoints, e.String()) } } } return ®istry.ServiceInstance{ ID: a.opts.id, Name: a.opts.name, Version: a.opts.version, Metadata: a.opts.metadata, Endpoints: endpoints, }, nil }
Endpoint 的获取有两个来源:
用户显式指定 :通过 kratos.Endpoint(url) 选项设置
从 Server 自动获取 :如果 Server 实现了 transport.Endpointer 接口,自动调用 Endpoint() 方法
Context 传播 AppInfo
App 通过 Context 将自身信息传播给下游:
1 2 3 4 5 6 7 8 9 10 11 / app.go type appKey struct {}func NewContext (ctx context.Context, s AppInfo) context.Context { return context.WithValue(ctx, appKey{}, s) } func FromContext (ctx context.Context) (s AppInfo, ok bool ) { s, ok = ctx.Value(appKey{}).(AppInfo) return }
appKey 是一个空结构体类型,用作 Context 的 key。这是 Go 中标准的"私有键"模式:
类型安全 :appKey 是包内私有类型,外部包无法创建相同类型的 key,避免 key 冲突
零内存占用 :struct{} 不占用内存空间
值相等 :两个 appKey{} 实例在 == 比较时是相等的,满足 context.Value 的 key 查找要求
使用示例:
1 2 3 4 5 6 func someHandler (ctx context.Context) { if appInfo, ok := kratos.FromContext(ctx); ok { fmt.Printf("Service: %s, ID: %s\n" , appInfo.Name(), appInfo.ID()) } }
小结
这篇文章我们分析了 Kratos 应用框架的核心组件 App 的实现,包括其生命周期管理、服务注册注销以及优雅停机等关键功能:
App 将多个 Transport Server 的启停逻辑统一管理,应用层只需关注 Run() 和 Stop()
通过 Option 函数灵活配置 App,与 config 模块保持一致的设计风格
利用 context.WithValue 传播 AppInfo,下游可按需获取
通过深入理解这些内部机制,我们可以更好地掌握如何构建和管理自己的微服务应用。