前两篇文章我们已经分析了 kratos 创建微服务项目所使用的模版 kratos-layout ,以及 kratos 微服务开发的脚手架命令 kratos CLI 命令。接下来的文章我们将分析 kratos 框架的内部实现,首先我们从其配置模块开始。
配置模块概览
Kratos 的配置模块(github.com/go-kratos/kratos/v2/config)是一个灵活、可扩展的配置管理系统,它为微服务应用提供了统一的配置加载、读取、监听和热更新能力。该模块的主要特性包括:
多源配置加载 :支持从文件、环境变量等多种数据源加载配置,并可自由扩展
配置合并 :将来自不同数据源的配置按优先级合并为统一视图
占位符解析 :支持 ${key:default} 格式的配置引用和默认值
类型自动转换 :提供 Bool/Int/Float/String/Duration 等强类型访问接口
配置热更新 :通过 Watch 机制监听配置变更,实时通知观察者
类型安全的值访问 :提供多种类型转换方法,避免手动类型断言
kratos-layout 中的配置使用
让我们从 kratos-layout 模板项目(即创建示例项目所依赖的模版项目)开始,看看它是如何使用 config 模块的。
配置文件结构
在 kratos-layout 创建的项目中,配置文件通常位于 configs/ 目录下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: http: addr: 0.0 .0 .0 :8000 timeout: 1s grpc: addr: 0.0 .0 .0 :9000 timeout: 1s data: database: driver: mysql source: root:root@tcp(127.0.0.1:3306)/test?parseTime=True&loc=Local redis: addr: 127.0 .0 .1 :6379 read_timeout: 0. 2s write_timeout: 0. 2s
配置定义(Proto)
使用 Protobuf 定义配置结构,位于 internal/conf/conf.proto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 syntax = "proto3" ; package kratos.api;option go_package = "kratos-demo/internal/conf;conf" ;import "google/protobuf/duration.proto" ;message Bootstrap { Server server = 1 ; Data data = 2 ; } message Server { message HTTP { string network = 1 ; string addr = 2 ; google.protobuf.Duration timeout = 3 ; } message GRPC { string network = 1 ; string addr = 2 ; google.protobuf.Duration timeout = 3 ; } HTTP http = 1 ; GRPC grpc = 2 ; } message Data { message Database { string driver = 1 ; string source = 2 ; } message Redis { string network = 1 ; string addr = 2 ; google.protobuf.Duration read_timeout = 3 ; google.protobuf.Duration write_timeout = 4 ; } Database database = 1 ; Redis redis = 2 ; }
通过 Protobuf 定义配置结构的好处是:
类型安全:编译时检查
文档化:proto 文件本身就是文档
跨语言:支持多种语言的代码生成
配置加载流程
在 cmd/kratos-demo/main.go 中,配置加载的核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import ( "kratos-demo/internal/conf" "github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config/file" ) func main () { flag.Parse() c := config.New( config.WithSource( file.NewSource(flagconf), ), ) defer c.Close() if err := c.Load(); err != nil { panic (err) } var bc conf.Bootstrap if err := c.Scan(&bc); err != nil { panic (err) } app, cleanup, err := wireApp(bc.Server, bc.Data, logger) if err != nil { panic (err) } defer cleanup() if err := app.Run(); err != nil { panic (err) } }
整个流程可以概括为:
1 创建 Config 实例 → 加载配置 → Scan 到结构体 → 依赖注入 → 运行应用
Config 模块核心接口设计
配置模块的设计采用了分层接口的方式,每个接口都有明确的职责:
核心接口
Config 接口
Config 是配置模块的顶层接口,定义了配置管理的核心功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type Config interface { Load() error Scan(v any) error Value(key string ) Value Watch(key string , o Observer) error Close() error }
Source 接口
Source 接口抽象了配置数据源,定义了如何从不同的来源加载配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type Source interface { Load() ([]*KeyValue, error ) Watch() (Watcher, error ) } type KeyValue struct { Key string Value []byte Format string }
KeyValue 的含义因 Source 而异
需要注意的是,KeyValue 并不是简单地保存"一个配置项的 key-value",而是 Source 与 Reader 之间的数据传输单元 。不同 Source 中 Key 的含义完全不同:
Source
Key 的含义
Value 的含义
Format
File (单文件)
文件名,如 config.yaml
文件全部内容
文件扩展名,如 yaml
File (目录)
每个文件的文件名
每个文件的全部内容
各自的扩展名
Env
环境变量名(去掉前缀后),如 SERVER_ADDR
环境变量的值
空(由 Decoder 按 . 展开为嵌套 map)
Apollo
命名空间名,如 application、app.yaml
非原始模式:将命名空间内所有 KV 重新序列化后的字节流;原始模式:命名空间的原始内容
命名空间后缀决定(yaml/yml→对应格式,application/properties→json)
Consul
KV 路径(去掉 path 前缀),如 key、a.bird.json
Consul KV 的值
Key 的扩展名(无扩展名则为空)
etcd
etcd Key 完整路径,如 /kratos/test/config、/kratos/test/ext/a.bird.json
etcd Value
Key 的扩展名(无扩展名则为空)
Nacos
Data ID,如 test.yaml
配置内容
Data ID 的扩展名
Kubernetes
namespace/configmap-name/data-key,如 default/test/test_config.json
ConfigMap 中该 data key 的值
data key 的扩展名(无扩展名则为空)
可以看到,KeyValue 本质上是 Source 对原始配置数据的一次封装 ,它将"数据从哪来"(Key)、“数据是什么”(Value)、“数据什么格式”(Format)这三个信息打包交给 Reader。Reader 再根据 Format 选择对应的解码器,将 Value 反序列化为 map[string]any,最终合并为统一的配置视图。
以 File Source 加载目录为例,目录下有 server.yaml 和 data.yaml 两个文件,则 Load() 返回两个 KeyValue:
1 2 KeyValue{Key: "server.yaml", Value: <server.yaml 的内容>, Format: "yaml"} KeyValue{Key: "data.yaml", Value: <data.yaml 的内容>, Format: "yaml"}
Reader 收到后,会分别用 yaml 解码器将它们解码为 map[string]any,然后合并到一起。
Reader 接口
Reader 接口负责配置的合并、存储和读取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Reader interface { Merge(...*KeyValue) error Value(string ) (Value, bool ) Source() ([]byte , error ) Resolve() error }
Value 接口
Value 接口表示配置值,提供类型安全的访问方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type Value interface { Bool() (bool , error ) Int() (int64 , error ) Float() (float64 , error ) String() (string , error ) Duration() (time.Duration, error ) Slice() ([]Value, error ) Map() (map [string ]Value, error ) Scan(any) error Load() any Store(any) }
Watcher 接口
Watcher 接口用于监听配置变化:
1 2 3 4 5 6 7 8 type Watcher interface { Next() ([]*KeyValue, error ) Stop() error }
接口关系图
1 2 3 4 5 Config (顶层接口) ├── 使用 ──→ Source (配置源) │ └── 提供 ──→ Watcher (监听器) └── 使用 ──→ Reader (配置读取器) └── 管理 ──→ Value (配置值)
设计模式
配置模块采用了多种设计模式:
选项模式(Options Pattern) :通过 Option 函数灵活配置 Config
策略模式(Strategy Pattern) :不同的 Source 实现代表不同的加载策略
观察者模式(Observer Pattern) :通过 Watch 机制监听配置变化
Config 实现原理
Config 结构
config 结构体是 Config 接口的实现:
1 2 3 4 5 6 7 8 type config struct { opts options reader Reader cached sync.Map observers sync.Map watchers []Watcher }
各字段的作用:
opts:存储配置选项,如 Source、Decoder、Resolver、Merge 函数等
reader:负责配置的合并、存储和读取
cached:缓存已经读取过的配置值,避免重复解析
observers:存储配置观察者,用于配置变化时通知
watchers:存储所有的监听器,用于停止监听
选项模式
什么是选项模式
选项模式(Options Pattern)是 Go 语言中一种惯用的设计模式,主要用于解决结构体参数的可选配置 问题。当我们创建一个对象时,往往只需要设置部分参数,其余使用默认值。Go 语言没有函数重载,也不支持可选参数,如果用传统的方式:
1 2 func NewConfig (source Source, decoder Decoder, resolver Resolver, merge Merge) Config
选项模式的核心思路是:定义一个 Option 函数类型,每个配置项对应一个返回 Option 的 WithXxx 函数,在构造时通过可变参数按需传入。 这样既保持了 API 的简洁性,又具备良好的扩展性——新增配置项只需要添加一个 WithXxx 函数,无需修改函数签名。
这种模式在 Go 开源社区中被广泛使用,例如 gRPC 的 grpc.DialOption、Uber 的 zap.Logger 等。kratos 框架本身也大量采用了选项模式,除了这里的 config.Option,还包括:
kratos.Option:应用级别的选项,如 kratos.ID()、kratos.Name()、kratos.Server() 等
grpc.ServerOption:gRPC 服务器选项,如 grpc.Address()、grpc.Timeout() 等
http.ServerOption:HTTP 服务器选项,如 http.Address()、http.Timeout() 等
配置模块的选项模式实现
配置模块的选项模式由三个部分组成:
1. 选项结构体 options
options 是一个非导出的结构体,集中存放所有可配置项:
1 2 3 4 5 6 7 type options struct { sources []Source decoder Decoder resolver Resolver merge Merge }
2. 选项函数类型 Option
Option 是一个函数类型,接受 *options 指针作为参数,通过修改指针指向的结构体来设置配置:
1 2 type Option func (*options)
3. 选项设置函数 WithXxx
每个可配置项都对应一个 WithXxx 函数,返回一个闭包,闭包内修改 options 的对应字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func WithSource (s ...Source) Option { return func (o *options) { o.sources = s } } func WithDecoder (d Decoder) Option { return func (o *options) { o.decoder = d } } func WithResolver (r Resolver) Option { return func (o *options) { o.resolver = r } } func WithResolveActualTypes (enableConvertToType bool ) Option { return func (o *options) { o.resolver = newActualTypesResolver(enableConvertToType) } } func WithMergeFunc (m Merge) Option { return func (o *options) { o.merge = m } }
创建 Config 实例
New 函数将选项模式的三部分串联起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func New (opts ...Option) Config { o := options{ decoder: defaultDecoder, resolver: defaultResolver, merge: func (dst, src any) error { return mergo.Map(dst, src, mergo.WithOverride) }, } for _, opt := range opts { opt(&o) } return &config{ opts: o, reader: newReader(o), } }
整个流程分为三步:
初始化默认值 :创建一个带有默认配置的 options 实例,确保即使用户不传入任何选项,Config 也能正常工作
应用用户选项 :遍历用户传入的 Option 函数列表,逐个调用,修改 options 的对应字段,覆盖默认值
构造实例 :用最终的 options 创建 config 实例和对应的 Reader
调用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 c := config.New( config.WithSource( file.NewSource("configs" ), ), ) c := config.New( config.WithSource(file.NewSource("configs" )), config.WithDecoder(myDecoder), config.WithResolver(myResolver), )
可以看到,选项模式让 API 具有很好的可读性和灵活性:用户只需要关注自己想设置的选项,其余自动使用默认值;新增配置项也不会破坏已有的调用代码。
加载配置(Load)
Load 方法从所有配置源加载配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (c *config) Load() error { for _, src := range c.opts.sources { kvs, err := src.Load() if err != nil { return err } for _, v := range kvs { log.Debugf("config loaded: %s format: %s" , v.Key, v.Format) } if err = c.reader.Merge(kvs...); err != nil { log.Errorf("failed to merge config source: %v" , err) return err } w, err := src.Watch() if err != nil { log.Errorf("failed to watch config source: %v" , err) return err } c.watchers = append (c.watchers, w) go c.watch(w) } if err := c.reader.Resolve(); err != nil { log.Errorf("failed to resolve config source: %v" , err) return err } return nil }
Load 流程图:
1 2 3 4 5 6 遍历所有 Source ├─ 1. Source.Load() 获取键值对 ├─ 2. Reader.Merge() 合并配置 ├─ 3. Source.Watch() 获取监听器 └─ 4. 启动协程监听变化 最后:Reader.Resolve() 解析占位符
配置合并(Merge)
Reader.Merge 方法实现配置合并:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (r *reader) Merge(kvs ...*KeyValue) error { merged, err := r.cloneMap() if err != nil { return err } for _, kv := range kvs { next := make (map [string ]any) if err := r.opts.decoder(kv, next); err != nil { log.Errorf("Failed to config decode error: %v" , err) return err } if err := r.opts.merge(&merged, convertMap(next)); err != nil { log.Errorf("Failed to config merge error: %v" , err) return err } } r.lock.Lock() r.values = merged r.lock.Unlock() return nil }
关键点:
使用 gob 实现深拷贝,避免修改影响原数据
使用 mergo 库实现递归合并
后加载的配置会覆盖前面的配置
深拷贝实现
cloneMap 函数使用 gob 实现深拷贝:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func cloneMap (src map [string ]any) (map [string ]any, error ) { var buf bytes.Buffer gob.Register(map [string ]any{}) gob.Register([]any{}) enc := gob.NewEncoder(&buf) dec := gob.NewDecoder(&buf) if err := enc.Encode(src); err != nil { return nil , err } var clone map [string ]any if err := dec.Decode(&clone); err != nil { return nil , err } return clone, nil }
为什么需要注册类型?
gob 要求对藏在 interface{} 后面的具体类型进行"注册",这样解码器才知道如何还原。因为 any 可以装任何东西,如果不注册,gob 就不知道应该还原成什么类型。
解码器(Decoder)
默认解码器 defaultDecoder 实现配置的反序列化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func defaultDecoder (src *KeyValue, target map [string ]any) error { if src.Format == "" { keys := strings.Split(src.Key, "." ) for i, k := range keys { if i == len (keys)-1 { target[k] = src.Value } else { sub := make (map [string ]any) target[k] = sub target = sub } } return nil } if codec := encoding.GetCodec(src.Format); codec != nil { return codec.Unmarshal(src.Value, &target) } return fmt.Errorf("unsupported key: %s format: %s" , src.Key, src.Format) }
两种模式:
无格式 :按 . 分隔键名,展开为嵌套 map
有格式 (yaml/json/xml):使用对应的编码器解码
占位符解析(Resolve)
Reader.Resolve 方法解析配置中的占位符:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 func defaultResolver (input map [string ]any) error { mapper := mapper(input) return resolver(input, mapper, false ) } func mapper (input map [string ]any) func (name string ) string { return func (name string ) string { args := strings.SplitN(strings.TrimSpace(name), ":" , 2 ) if v, has := readValue(input, args[0 ]); has { s, _ := v.String() return s } else if len (args) > 1 { return args[1 ] } return "" } } func resolver (input map [string ]any, mapper func (string ) string , toType bool ) error { var resolve func (map [string ]any) error resolve = func (sub map [string ]any) error { for k, v := range sub { switch vt := v.(type ) { case string : sub[k] = expand(vt, mapper, toType) case map [string ]any: if err := resolve(vt); err != nil { return err } case []any: for i, iface := range vt { switch it := iface.(type ) { case string : vt[i] = expand(it, mapper, toType) case map [string ]any: if err := resolve(it); err != nil { return err } } } sub[k] = vt } } return nil } return resolve(input) } func expand (s string , mapping func (string ) string , toType bool ) any { re := placeholderRegexp.FindAllStringSubmatch(s, -1 ) for _, i := range re { if len (i) == 2 { m := mapping(i[1 ]) if toType { return convertToType(m) } s = strings.ReplaceAll(s, i[0 ], m) } } return s }
占位符示例:
1 2 3 4 5 6 7 8 9 app: name: kratos-demo region: ${app.name:default} port: ${server.port:8080} database: host: ${app.name} port: ${db.port:3306}
注意 :占位符 ${key:default} 解析的是配置自身 中的 key,而非环境变量。mapper 函数通过 readValue(input, args[0]) 从已加载的配置 map 中查找值。如果需要引用环境变量,需配合 Env Source 使用——先将环境变量加载为配置,再通过占位符引用。
获取配置值(Value)
config.Value 方法获取配置值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (c *config) Value(key string ) Value { if v, ok := c.cached.Load(key); ok { return v.(Value) } if v, ok := c.reader.Value(key); ok { c.cached.Store(key, v) return v } return &errValue{err: ErrNotFound} }
Reader.Value 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func readValue (values map [string ]any, path string ) (Value, bool ) { keys := strings.Split(path, "." ) next := values last := len (keys) - 1 for idx, key := range keys { value, ok := next[key] if !ok { return nil , false } if idx == last { av := &atomicValue{} av.Store(value) return av, true } switch vm := value.(type ) { case map [string ]any: next = vm default : return nil , false } } return nil , false }
路径示例:
1 2 c.Value("server.http.addr" ) c.Value("data.database.driver" )
Scan 方法
Scan 方法将配置反序列化到目标对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (c *config) Scan(v any) error { data, err := c.reader.Source() if err != nil { return err } return unmarshalJSON(data, v) } func unmarshalJSON (data []byte , v any) error { if m, ok := v.(proto.Message); ok { return protojson.UnmarshalOptions{DiscardUnknown: true }.Unmarshal(data, m) } return json.Unmarshal(data, v) }
支持的目标类型:
Go struct(使用标准库 json)
Protobuf Message(使用 protojson)
map[string]any
Value 的类型转换
atomicValue 实现了各种类型转换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (v *atomicValue) Bool() (bool , error ) { switch val := v.Load().(type ) { case bool : return val, nil case int , int8 , int16 , int32 , int64 , uint , uint8 , uint16 , uint32 , uint64 , float32 , float64 : return strconv.ParseBool(fmt.Sprint(val)) case string : return strconv.ParseBool(val) } return false , v.typeAssertError() } func (v *atomicValue) Int() (int64 , error ) { switch val := v.Load().(type ) { case int : return int64 (val), nil case int64 : return val, nil case string : return strconv.ParseInt(val, 10 , 64 ) } return 0 , v.typeAssertError() } func (v *atomicValue) String() (string , error ) { switch val := v.Load().(type ) { case string : return val, nil case bool , int , int8 , int16 , int32 , int64 , uint , uint8 , uint16 , uint32 , uint64 , float32 , float64 : return fmt.Sprint(val), nil } return "" , v.typeAssertError() }
Scan 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (v *atomicValue) Scan(obj any) error { data, err := json.Marshal(v.Load()) if err != nil { return err } if pb, ok := obj.(proto.Message); ok { return kratosjson.UnmarshalOptions.Unmarshal(data, pb) } return json.Unmarshal(data, obj) }
配置源实现
File Source
File Source 从文件或目录加载配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 type file struct { path string } func NewSource (path string ) config.Source { return &file{path: path} } func (f *file) Load() (kvs []*config.KeyValue, err error ) { fi, err := os.Stat(f.path) if err != nil { return nil , err } if fi.IsDir() { return f.loadDir(f.path) } kv, err := f.loadFile(f.path) if err != nil { return nil , err } return []*config.KeyValue{kv}, nil } func (f *file) loadDir(path string ) (kvs []*config.KeyValue, err error ) { files, err := os.ReadDir(path) if err != nil { return nil , err } for _, file := range files { if file.IsDir() || strings.HasPrefix(file.Name(), "." ) { continue } kv, err := f.loadFile(filepath.Join(path, file.Name())) if err != nil { return nil , err } kvs = append (kvs, kv) } return } func (f *file) loadFile(path string ) (*config.KeyValue, error ) { file, err := os.Open(path) if err != nil { return nil , err } defer file.Close() data, err := io.ReadAll(file) if err != nil { return nil , err } info, err := file.Stat() if err != nil { return nil , err } return &config.KeyValue{ Key: info.Name(), Format: format(info.Name()), Value: data, }, nil }
format 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func format (filename string ) string { ext := filepath.Ext(filename) switch ext { case ".json" : return "json" case ".yaml" , ".yml" : return "yaml" case ".xml" : return "xml" default : return "" } }
Env Source
Env Source 从环境变量加载配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 type env struct { prefixes []string } func NewSource (prefixes ...string ) config.Source { return &env{prefixes: prefixes} } func (e *env) Load() (kvs []*config.KeyValue, err error ) { return e.load(os.Environ()), nil } func (e *env) load(envs []string ) []*config.KeyValue { var kvs []*config.KeyValue for _, env := range envs { k, v, _ := strings.Cut(env, "=" ) if k == "" { continue } if len (e.prefixes) > 0 { prefix, ok := matchPrefix(e.prefixes, k) if !ok || k == prefix { continue } k = strings.TrimPrefix(k, prefix) k = strings.TrimPrefix(k, "_" ) } if k != "" { kvs = append (kvs, &config.KeyValue{ Key: k, Value: []byte (v), }) } } return kvs }
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 c := config.New( config.WithSource( env.NewSource("KRATOS_" ), ), )
配置热更新
Watch 机制
Config 通过 Watch 机制实现配置热更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (c *config) watch(w Watcher) { for { kvs, err := w.Next() if err != nil { if errors.Is(err, context.Canceled) { log.Infof("watcher's ctx cancel : %v" , err) return } time.Sleep(time.Second) log.Errorf("failed to watch next config: %v" , err) continue } if err := c.reader.Merge(kvs...); err != nil { log.Errorf("failed to merge next config: %v" , err) continue } if err := c.reader.Resolve(); err != nil { log.Errorf("failed to resolve next config: %v" , err) continue } c.cached.Range(func (key, value any) bool { k := key.(string ) v := value.(Value) if n, ok := c.reader.Value(k); ok && reflect.TypeOf(n.Load()) == reflect.TypeOf(v.Load()) && !reflect.DeepEqual(n.Load(), v.Load()) { v.Store(n.Load()) if o, ok := c.observers.Load(k); ok { o.(Observer)(k, v) } } return true }) } }
File Watcher 实现
File Watcher 使用 fsnotify 监听文件变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 type watcher struct { f *file fw *fsnotify.Watcher ctx context.Context cancel context.CancelFunc } func newWatcher (f *file) (config.Watcher, error ) { fw, err := fsnotify.NewWatcher() if err != nil { return nil , err } if err := fw.Add(f.path); err != nil { return nil , err } ctx, cancel := context.WithCancel(context.Background()) return &watcher{ f: f, fw: fw, ctx: ctx, cancel: cancel, }, nil } func (w *watcher) Next() ([]*config.KeyValue, error ) { select { case <-w.ctx.Done(): return nil , w.ctx.Err() case event := <-w.fw.Events: if event.Op == fsnotify.Rename { if _, err := os.Stat(event.Name); err == nil || os.IsExist(err) { if err := w.fw.Add(event.Name); err != nil { return nil , err } } } fi, err := os.Stat(w.f.path) if err != nil { return nil , err } path := w.f.path if fi.IsDir() { path = filepath.Join(w.f.path, filepath.Base(event.Name)) } kv, err := w.f.loadFile(path) if err != nil { return nil , err } return []*config.KeyValue{kv}, nil case err := <-w.fw.Errors: return nil , err } } func (w *watcher) Stop() error { w.cancel() return w.fw.Close() }
监听配置变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (c *config) Watch(key string , o Observer) error { if v := c.Value(key); v.Load() == nil { return ErrNotFound } c.observers.Store(key, o) return nil } type Observer func (key string , value Value)
使用示例:
1 2 3 4 c.Watch("server.http.addr" , func (key string , value config.Value) { fmt.Printf("配置变化: %s = %v\n" , key, value.Load()) })
热更新流程图
1 2 3 4 5 6 7 8 9 10 11 12 13 文件变化 ↓ fsnotify 检测到事件 ↓ Watcher.Next() 返回新的键值对 ↓ Reader.Merge() 合并新配置 ↓ Reader.Resolve() 解析占位符 ↓ 更新缓存 ↓ 通知观察者
Demo 示例
让我们通过几个完整的示例来演示配置模块的使用。
基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "fmt" "log" "github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config/file" ) func main () { c := config.New( config.WithSource( file.NewSource("configs/server.yaml" ), ), ) defer c.Close() if err := c.Load(); err != nil { log.Fatalf("加载配置失败: %v" , err) } addr := c.Value("server.http.addr" ) fmt.Printf("HTTP 地址: %s\n" , addr.String()) timeout := c.Value("server.http.timeout" ) fmt.Printf("超时时间: %v\n" , timeout.Load()) }
配置合并
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 c := config.New( config.WithSource( file.NewSource("configs/base.yaml" ), file.NewSource("configs/dev.yaml" ), ), ) defer c.Close()if err := c.Load(); err != nil { log.Fatalf("加载配置失败: %v" , err) } dbSource := c.Value("data.database.source" ) fmt.Printf("数据库连接: %s\n" , dbSource.String())
Scan 到结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 c := config.New( config.WithSource( file.NewSource("configs/server.yaml" ), ), ) defer c.Close()if err := c.Load(); err != nil { log.Fatalf("加载配置失败: %v" , err) } var serverConf map [string ]anyif err := c.Scan(&serverConf); err != nil { log.Fatalf("Scan 失败: %v" , err) } fmt.Printf("完整配置: %v\n" , serverConf) type HTTPConfig struct { Addr string `json:"addr"` Timeout string `json:"timeout"` } var httpConf HTTPConfigif err := c.Value("server.http" ).Scan(&httpConf); err != nil { log.Printf("Scan 失败: %v" , err) } else { fmt.Printf("HTTP 配置: %+v\n" , httpConf) }
配置热更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 c := config.New( config.WithSource( file.NewSource("configs/server.yaml" ), ), ) defer c.Close()if err := c.Load(); err != nil { log.Fatalf("加载配置失败: %v" , err) } err := c.Watch("server.http.addr" , func (key string , value config.Value) { fmt.Printf("配置变化: %s = %v\n" , key, value.Load()) }) if err != nil { log.Printf("Watch 失败: %v" , err) } fmt.Println("正在监听配置变化..." )
多源配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import ( "github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config/file" "github.com/go-kratos/kratos/v2/config/env" ) c := config.New( config.WithSource( file.NewSource("configs/server.yaml" ), env.NewSource("KRATOS_" ), ), ) defer c.Close()if err := c.Load(); err != nil { log.Fatalf("加载配置失败: %v" , err) } addr := c.Value("server.http.addr" ) fmt.Printf("HTTP 地址: %s\n" , addr.String())
扩展配置源
Kratos 配置模块支持扩展自定义配置源。只需要实现 Source 接口即可。Kratos 社区已经提供了多个配置源的实现:
contrib/config/apollo :Apollo 配置中心
contrib/config/consul :Consul 配置中心
contrib/config/etcd :etcd 配置中心
contrib/config/nacos :Nacos 配置中心
contrib/config/kubernetes :Kubernetes ConfigMap
小结
Kratos 配置模块是一个分层抽象、灵活可扩展的配置管理系统,支持配置合并、多种配置源、源热更新、占位符解析等功能。通过研究该模块的代码,我们可以学习到一个简易、通用的配置管理系统应该如何实现。