0%

Kratos 源码分析(四):配置模块 Config 实现详解

前两篇文章我们已经分析了 kratos 创建微服务项目所使用的模版 kratos-layout,以及 kratos 微服务开发的脚手架命令 kratos CLI 命令。接下来的文章我们将分析 kratos 框架的内部实现,首先我们从其配置模块开始。

配置模块概览

Kratos 的配置模块(github.com/go-kratos/kratos/v2/config)是一个灵活、可扩展的配置管理系统,它为微服务应用提供了统一的配置加载、读取、监听和热更新能力。该模块的主要特性包括:

  • 多源配置加载:支持从文件、环境变量等多种数据源加载配置,并可自由扩展
  • 配置合并:将来自不同数据源的配置按优先级合并为统一视图
  • 占位符解析:支持 ${key:default} 格式的配置引用和默认值
  • 类型自动转换:提供 Bool/Int/Float/String/Duration 等强类型访问接口
  • 配置热更新:通过 Watch 机制监听配置变更,实时通知观察者
  • 类型安全的值访问:提供多种类型转换方法,避免手动类型断言

kratos-layout 中的配置使用

让我们从 kratos-layout 模板项目(即创建示例项目所依赖的模版项目)开始,看看它是如何使用 config 模块的。

配置文件结构

在 kratos-layout 创建的项目中,配置文件通常位于 configs/ 目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# configs/config.yaml
server:
http:
addr: 0.0.0.0:8000
timeout: 1s
grpc:
addr: 0.0.0.0:9000
timeout: 1s
data:
database:
driver: mysql
source: root:root@tcp(127.0.0.1:3306)/test?parseTime=True&loc=Local
redis:
addr: 127.0.0.1:6379
read_timeout: 0.2s
write_timeout: 0.2s

配置定义(Proto)

使用 Protobuf 定义配置结构,位于 internal/conf/conf.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
syntax = "proto3";
package kratos.api;

option go_package = "kratos-demo/internal/conf;conf";

import "google/protobuf/duration.proto";

message Bootstrap {
Server server = 1;
Data data = 2;
}

message Server {
message HTTP {
string network = 1;
string addr = 2;
google.protobuf.Duration timeout = 3;
}
message GRPC {
string network = 1;
string addr = 2;
google.protobuf.Duration timeout = 3;
}
HTTP http = 1;
GRPC grpc = 2;
}

message Data {
message Database {
string driver = 1;
string source = 2;
}
message Redis {
string network = 1;
string addr = 2;
google.protobuf.Duration read_timeout = 3;
google.protobuf.Duration write_timeout = 4;
}
Database database = 1;
Redis redis = 2;
}

通过 Protobuf 定义配置结构的好处是:

  • 类型安全:编译时检查
  • 文档化:proto 文件本身就是文档
  • 跨语言:支持多种语言的代码生成

配置加载流程

cmd/kratos-demo/main.go 中,配置加载的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import (
"kratos-demo/internal/conf"

"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
)

func main() {
flag.Parse()

// 1. 创建配置实例
c := config.New(
config.WithSource(
file.NewSource(flagconf), // 指定配置文件路径
),
)
defer c.Close()

// 2. 加载配置
if err := c.Load(); err != nil {
panic(err)
}

// 3. Scan 到结构体
var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
panic(err)
}

// 4. 使用配置(通过依赖注入)
app, cleanup, err := wireApp(bc.Server, bc.Data, logger)
if err != nil {
panic(err)
}
defer cleanup()

if err := app.Run(); err != nil {
panic(err)
}
}

整个流程可以概括为:

1
创建 Config 实例 → 加载配置 → Scan 到结构体 → 依赖注入 → 运行应用

Config 模块核心接口设计

配置模块的设计采用了分层接口的方式,每个接口都有明确的职责:

核心接口

Config 接口

Config 是配置模块的顶层接口,定义了配置管理的核心功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// config/config.go
type Config interface {
// Load 从所有配置源加载配置
Load() error

// Scan 将配置反序列化到目标对象
Scan(v any) error

// Value 根据路径获取配置值
Value(key string) Value

// Watch 监听配置变化
Watch(key string, o Observer) error

// Close 关闭配置,停止监听
Close() error
}

Source 接口

Source 接口抽象了配置数据源,定义了如何从不同的来源加载配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// config/source.go
type Source interface {
// Load 加载配置数据,返回键值对列表
Load() ([]*KeyValue, error)

// Watch 返回一个监听器,用于监听配置变化
Watch() (Watcher, error)
}

// KeyValue 是配置数据的传输单元
type KeyValue struct {
Key string // 数据标识(含义因 Source 而异)
Value []byte // 数据内容(原始字节)
Format string // 数据格式(如 "yaml"、"json",为空时由 Decoder 按路径展开)
}

KeyValue 的含义因 Source 而异

需要注意的是,KeyValue 并不是简单地保存"一个配置项的 key-value",而是 Source 与 Reader 之间的数据传输单元。不同 Source 中 Key 的含义完全不同:

Source Key 的含义 Value 的含义 Format
File(单文件) 文件名,如 config.yaml 文件全部内容 文件扩展名,如 yaml
File(目录) 每个文件的文件名 每个文件的全部内容 各自的扩展名
Env 环境变量名(去掉前缀后),如 SERVER_ADDR 环境变量的值 空(由 Decoder 按 . 展开为嵌套 map)
Apollo 命名空间名,如 applicationapp.yaml 非原始模式:将命名空间内所有 KV 重新序列化后的字节流;原始模式:命名空间的原始内容 命名空间后缀决定(yaml/yml→对应格式,application/propertiesjson
Consul KV 路径(去掉 path 前缀),如 keya.bird.json Consul KV 的值 Key 的扩展名(无扩展名则为空)
etcd etcd Key 完整路径,如 /kratos/test/config/kratos/test/ext/a.bird.json etcd Value Key 的扩展名(无扩展名则为空)
Nacos Data ID,如 test.yaml 配置内容 Data ID 的扩展名
Kubernetes namespace/configmap-name/data-key,如 default/test/test_config.json ConfigMap 中该 data key 的值 data key 的扩展名(无扩展名则为空)

可以看到,KeyValue 本质上是 Source 对原始配置数据的一次封装,它将"数据从哪来"(Key)、“数据是什么”(Value)、“数据什么格式”(Format)这三个信息打包交给 Reader。Reader 再根据 Format 选择对应的解码器,将 Value 反序列化为 map[string]any,最终合并为统一的配置视图。

以 File Source 加载目录为例,目录下有 server.yamldata.yaml 两个文件,则 Load() 返回两个 KeyValue:

1
2
KeyValue{Key: "server.yaml", Value: <server.yaml 的内容>, Format: "yaml"}
KeyValue{Key: "data.yaml", Value: <data.yaml 的内容>, Format: "yaml"}

Reader 收到后,会分别用 yaml 解码器将它们解码为 map[string]any,然后合并到一起。

Reader 接口

Reader 接口负责配置的合并、存储和读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// config/reader.go
type Reader interface {
// Merge 将多个 KeyValue 解码后合并到当前配置中(后合并的覆盖先合并的)
Merge(...*KeyValue) error

// Value 根据路径获取配置值
Value(string) (Value, bool)

// Source 返回所有配置的 JSON 编码
Source() ([]byte, error)

// Resolve 解析配置中的占位符
Resolve() error
}

Value 接口

Value 接口表示配置值,提供类型安全的访问方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// config/value.go
type Value interface {
// Bool 转换为 bool
Bool() (bool, error)

// Int 转换为 int64
Int() (int64, error)

// Float 转换为 float64
Float() (float64, error)

// String 转换为 string
String() (string, error)

// Duration 转换为 time.Duration
Duration() (time.Duration, error)

// Slice 转换为切片
Slice() ([]Value, error)

// Map 转换为 map
Map() (map[string]Value, error)

// Scan 扫描到目标对象
Scan(any) error

// Load 获取原始值
Load() any

// Store 存储新值
Store(any)
}

Watcher 接口

Watcher 接口用于监听配置变化:

1
2
3
4
5
6
7
8
// config/source.go
type Watcher interface {
// Next 阻塞等待配置变更,返回变更后的完整 KeyValue
Next() ([]*KeyValue, error)

// Stop 停止监听
Stop() error
}

接口关系图

1
2
3
4
5
Config (顶层接口)
├── 使用 ──→ Source (配置源)
│ └── 提供 ──→ Watcher (监听器)
└── 使用 ──→ Reader (配置读取器)
└── 管理 ──→ Value (配置值)

设计模式

配置模块采用了多种设计模式:

  1. 选项模式(Options Pattern):通过 Option 函数灵活配置 Config
  2. 策略模式(Strategy Pattern):不同的 Source 实现代表不同的加载策略
  3. 观察者模式(Observer Pattern):通过 Watch 机制监听配置变化

Config 实现原理

Config 结构

config 结构体是 Config 接口的实现:

1
2
3
4
5
6
7
8
// config/config.go
type config struct {
opts options // 配置选项
reader Reader // 配置读取器
cached sync.Map // 配置缓存
observers sync.Map // 配置观察者
watchers []Watcher // 监听器列表
}

各字段的作用:

  • opts:存储配置选项,如 Source、Decoder、Resolver、Merge 函数等
  • reader:负责配置的合并、存储和读取
  • cached:缓存已经读取过的配置值,避免重复解析
  • observers:存储配置观察者,用于配置变化时通知
  • watchers:存储所有的监听器,用于停止监听

选项模式

什么是选项模式

选项模式(Options Pattern)是 Go 语言中一种惯用的设计模式,主要用于解决结构体参数的可选配置问题。当我们创建一个对象时,往往只需要设置部分参数,其余使用默认值。Go 语言没有函数重载,也不支持可选参数,如果用传统的方式:

1
2
// 不好:参数列表会随需求增长而膨胀,且无法跳过中间参数
func NewConfig(source Source, decoder Decoder, resolver Resolver, merge Merge) Config

选项模式的核心思路是:定义一个 Option 函数类型,每个配置项对应一个返回 OptionWithXxx 函数,在构造时通过可变参数按需传入。 这样既保持了 API 的简洁性,又具备良好的扩展性——新增配置项只需要添加一个 WithXxx 函数,无需修改函数签名。

这种模式在 Go 开源社区中被广泛使用,例如 gRPC 的 grpc.DialOption、Uber 的 zap.Logger 等。kratos 框架本身也大量采用了选项模式,除了这里的 config.Option,还包括:

  • kratos.Option:应用级别的选项,如 kratos.ID()kratos.Name()kratos.Server()
  • grpc.ServerOption:gRPC 服务器选项,如 grpc.Address()grpc.Timeout()
  • http.ServerOption:HTTP 服务器选项,如 http.Address()http.Timeout()

配置模块的选项模式实现

配置模块的选项模式由三个部分组成:

1. 选项结构体 options

options 是一个非导出的结构体,集中存放所有可配置项:

1
2
3
4
5
6
7
// config/options.go
type options struct {
sources []Source // 配置源列表
decoder Decoder // 解码器
resolver Resolver // 占位符解析器
merge Merge // 合并函数
}

2. 选项函数类型 Option

Option 是一个函数类型,接受 *options 指针作为参数,通过修改指针指向的结构体来设置配置:

1
2
// config/options.go
type Option func(*options)

3. 选项设置函数 WithXxx

每个可配置项都对应一个 WithXxx 函数,返回一个闭包,闭包内修改 options 的对应字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// config/options.go

// WithSource 设置配置源
func WithSource(s ...Source) Option {
return func(o *options) {
o.sources = s
}
}

// WithDecoder 设置解码器
func WithDecoder(d Decoder) Option {
return func(o *options) {
o.decoder = d
}
}

// WithResolver 设置占位符解析器
func WithResolver(r Resolver) Option {
return func(o *options) {
o.resolver = r
}
}

// WithResolveActualTypes 设置是否自动解析配置值的实际类型
func WithResolveActualTypes(enableConvertToType bool) Option {
return func(o *options) {
o.resolver = newActualTypesResolver(enableConvertToType)
}
}

// WithMergeFunc 设置配置合并函数
func WithMergeFunc(m Merge) Option {
return func(o *options) {
o.merge = m
}
}

创建 Config 实例

New 函数将选项模式的三部分串联起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// config/config.go
func New(opts ...Option) Config {
// 1. 创建默认配置选项
o := options{
decoder: defaultDecoder,
resolver: defaultResolver,
merge: func(dst, src any) error {
return mergo.Map(dst, src, mergo.WithOverride)
},
}

// 2. 应用用户提供的选项(覆盖默认值)
for _, opt := range opts {
opt(&o)
}

// 3. 创建并返回 Config 实例
return &config{
opts: o,
reader: newReader(o),
}
}

整个流程分为三步:

  1. 初始化默认值:创建一个带有默认配置的 options 实例,确保即使用户不传入任何选项,Config 也能正常工作
  2. 应用用户选项:遍历用户传入的 Option 函数列表,逐个调用,修改 options 的对应字段,覆盖默认值
  3. 构造实例:用最终的 options 创建 config 实例和对应的 Reader

调用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 只设置配置源,其余使用默认值
c := config.New(
config.WithSource(
file.NewSource("configs"),
),
)

// 设置多个选项
c := config.New(
config.WithSource(file.NewSource("configs")),
config.WithDecoder(myDecoder),
config.WithResolver(myResolver),
)

可以看到,选项模式让 API 具有很好的可读性和灵活性:用户只需要关注自己想设置的选项,其余自动使用默认值;新增配置项也不会破坏已有的调用代码。

加载配置(Load)

Load 方法从所有配置源加载配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// config/config.go
func (c *config) Load() error {
// 遍历所有配置源
for _, src := range c.opts.sources {
// 1. 从配置源加载键值对
kvs, err := src.Load()
if err != nil {
return err
}

// 2. 记录加载日志
for _, v := range kvs {
log.Debugf("config loaded: %s format: %s", v.Key, v.Format)
}

// 3. 合并配置到 Reader
if err = c.reader.Merge(kvs...); err != nil {
log.Errorf("failed to merge config source: %v", err)
return err
}

// 4. 获取配置源的监听器
w, err := src.Watch()
if err != nil {
log.Errorf("failed to watch config source: %v", err)
return err
}

// 5. 保存监听器
c.watchers = append(c.watchers, w)

// 6. 启动协程监听配置变化
go c.watch(w)
}

// 7. 解析配置中的占位符
if err := c.reader.Resolve(); err != nil {
log.Errorf("failed to resolve config source: %v", err)
return err
}

return nil
}

Load 流程图:

1
2
3
4
5
6
遍历所有 Source
├─ 1. Source.Load() 获取键值对
├─ 2. Reader.Merge() 合并配置
├─ 3. Source.Watch() 获取监听器
└─ 4. 启动协程监听变化
最后:Reader.Resolve() 解析占位符

配置合并(Merge)

Reader.Merge 方法实现配置合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// config/reader.go
func (r *reader) Merge(kvs ...*KeyValue) error {
// 1. 克隆当前配置(深拷贝)
merged, err := r.cloneMap()
if err != nil {
return err
}

// 2. 遍历所有键值对
for _, kv := range kvs {
next := make(map[string]any)

// 3. 解码键值对
if err := r.opts.decoder(kv, next); err != nil {
log.Errorf("Failed to config decode error: %v", err)
return err
}

// 4. 合并到主配置
if err := r.opts.merge(&merged, convertMap(next)); err != nil {
log.Errorf("Failed to config merge error: %v", err)
return err
}
}

// 5. 更新配置(加锁保护)
r.lock.Lock()
r.values = merged
r.lock.Unlock()

return nil
}

关键点:

  • 使用 gob 实现深拷贝,避免修改影响原数据
  • 使用 mergo 库实现递归合并
  • 后加载的配置会覆盖前面的配置

深拷贝实现

cloneMap 函数使用 gob 实现深拷贝:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// config/reader.go
func cloneMap(src map[string]any) (map[string]any, error) {
var buf bytes.Buffer

// 注册可能出现的类型
gob.Register(map[string]any{})
gob.Register([]any{})

enc := gob.NewEncoder(&buf)
dec := gob.NewDecoder(&buf)

// 序列化
if err := enc.Encode(src); err != nil {
return nil, err
}

// 反序列化
var clone map[string]any
if err := dec.Decode(&clone); err != nil {
return nil, err
}

return clone, nil
}

为什么需要注册类型?

gob 要求对藏在 interface{} 后面的具体类型进行"注册",这样解码器才知道如何还原。因为 any 可以装任何东西,如果不注册,gob 就不知道应该还原成什么类型。

解码器(Decoder)

默认解码器 defaultDecoder 实现配置的反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// config/options.go
func defaultDecoder(src *KeyValue, target map[string]any) error {
// 如果没有指定格式,按键的层级展开
if src.Format == "" {
// 例如:key="aaa.bbb", value="xxx"
// 展开为:map[aaa]map[bbb]xxx
keys := strings.Split(src.Key, ".")
for i, k := range keys {
if i == len(keys)-1 {
target[k] = src.Value
} else {
sub := make(map[string]any)
target[k] = sub
target = sub
}
}
return nil
}

// 根据 format 获取解码器
if codec := encoding.GetCodec(src.Format); codec != nil {
return codec.Unmarshal(src.Value, &target)
}

return fmt.Errorf("unsupported key: %s format: %s", src.Key, src.Format)
}

两种模式:

  1. 无格式:按 . 分隔键名,展开为嵌套 map
  2. 有格式(yaml/json/xml):使用对应的编码器解码

占位符解析(Resolve)

Reader.Resolve 方法解析配置中的占位符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// config/options.go
func defaultResolver(input map[string]any) error {
// 创建值查找函数
mapper := mapper(input)
return resolver(input, mapper, false)
}

// mapper 返回一个函数,用于从配置中查找值
func mapper(input map[string]any) func(name string) string {
return func(name string) string {
// 支持格式:${key:default}
args := strings.SplitN(strings.TrimSpace(name), ":", 2)

// 查找键对应的值
if v, has := readValue(input, args[0]); has {
s, _ := v.String()
return s
} else if len(args) > 1 {
// 返回默认值
return args[1]
}
return ""
}
}

// resolver 递归解析配置中的占位符
func resolver(input map[string]any, mapper func(string) string, toType bool) error {
var resolve func(map[string]any) error

resolve = func(sub map[string]any) error {
for k, v := range sub {
switch vt := v.(type) {
case string:
// 替换占位符
sub[k] = expand(vt, mapper, toType)
case map[string]any:
// 递归处理嵌套 map
if err := resolve(vt); err != nil {
return err
}
case []any:
// 处理数组
for i, iface := range vt {
switch it := iface.(type) {
case string:
vt[i] = expand(it, mapper, toType)
case map[string]any:
if err := resolve(it); err != nil {
return err
}
}
}
sub[k] = vt
}
}
return nil
}

return resolve(input)
}

// expand 替换字符串中的占位符
func expand(s string, mapping func(string) string, toType bool) any {
re := placeholderRegexp.FindAllStringSubmatch(s, -1)

for _, i := range re {
// 匹配到的每一项 i 是一个字符串切片 []string
// 第 0 位:整个正则匹配到的完整字符串
// 第 1 位以后:正则里捕获组(括号)匹配到的内容
if len(i) == 2 {
m := mapping(i[1])
if toType {
// 转换类型
return convertToType(m)
}
// 字符串替换
s = strings.ReplaceAll(s, i[0], m)
}
}

return s
}

占位符示例:

1
2
3
4
5
6
7
8
9
# 配置文件
app:
name: kratos-demo
region: ${app.name:default} # 引用配置中 app.name 的值,找不到则用 "default"
port: ${server.port:8080} # 引用配置中 server.port 的值,找不到则用 8080

database:
host: ${app.name} # 引用配置中 app.name 的值,无默认值
port: ${db.port:3306} # 引用配置中 db.port 的值,找不到则用 3306

注意:占位符 ${key:default} 解析的是配置自身中的 key,而非环境变量。mapper 函数通过 readValue(input, args[0]) 从已加载的配置 map 中查找值。如果需要引用环境变量,需配合 Env Source 使用——先将环境变量加载为配置,再通过占位符引用。

获取配置值(Value)

config.Value 方法获取配置值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// config/config.go
func (c *config) Value(key string) Value {
// 1. 尝试从缓存获取
if v, ok := c.cached.Load(key); ok {
return v.(Value)
}

// 2. 从 Reader 读取
if v, ok := c.reader.Value(key); ok {
// 3. 缓存结果
c.cached.Store(key, v)
return v
}

// 4. 返回错误值
return &errValue{err: ErrNotFound}
}

Reader.Value 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// config/reader.go
func readValue(values map[string]any, path string) (Value, bool) {
// 1. 按点分隔路径
keys := strings.Split(path, ".")
next := values
last := len(keys) - 1

// 2. 逐层查找
for idx, key := range keys {
value, ok := next[key]
if !ok {
return nil, false
}

// 3. 到达最后一层
if idx == last {
av := &atomicValue{}
av.Store(value)
return av, true
}

// 4. 继续下一层
switch vm := value.(type) {
case map[string]any:
next = vm
default:
return nil, false
}
}

return nil, false
}

路径示例:

1
2
c.Value("server.http.addr")        // 获取 server.http.addr
c.Value("data.database.driver") // 获取 data.database.driver

Scan 方法

Scan 方法将配置反序列化到目标对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// config/config.go
func (c *config) Scan(v any) error {
// 1. 获取所有配置的 JSON 编码
data, err := c.reader.Source()
if err != nil {
return err
}

// 2. 反序列化到目标对象
return unmarshalJSON(data, v)
}

// unmarshalJSON 支持普通对象和 Protobuf 对象
func unmarshalJSON(data []byte, v any) error {
// 如果是 Protobuf 对象
if m, ok := v.(proto.Message); ok {
return protojson.UnmarshalOptions{DiscardUnknown: true}.Unmarshal(data, m)
}
// 普通对象
return json.Unmarshal(data, v)
}

支持的目标类型:

  • Go struct(使用标准库 json)
  • Protobuf Message(使用 protojson)
  • map[string]any

Value 的类型转换

atomicValue 实现了各种类型转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// config/value.go
func (v *atomicValue) Bool() (bool, error) {
switch val := v.Load().(type) {
case bool:
return val, nil
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
return strconv.ParseBool(fmt.Sprint(val))
case string:
return strconv.ParseBool(val)
}
return false, v.typeAssertError()
}

func (v *atomicValue) Int() (int64, error) {
switch val := v.Load().(type) {
case int:
return int64(val), nil
case int64:
return val, nil
// ... 其他整数类型
case string:
return strconv.ParseInt(val, 10, 64)
}
return 0, v.typeAssertError()
}

func (v *atomicValue) String() (string, error) {
switch val := v.Load().(type) {
case string:
return val, nil
case bool, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64:
return fmt.Sprint(val), nil
}
return "", v.typeAssertError()
}

Scan 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// config/value.go
func (v *atomicValue) Scan(obj any) error {
// 1. 序列化为 JSON
data, err := json.Marshal(v.Load())
if err != nil {
return err
}

// 2. 如果是 Protobuf 对象
if pb, ok := obj.(proto.Message); ok {
return kratosjson.UnmarshalOptions.Unmarshal(data, pb)
}

// 3. 普通对象
return json.Unmarshal(data, obj)
}

配置源实现

File Source

File Source 从文件或目录加载配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// config/file/file.go
type file struct {
path string
}

func NewSource(path string) config.Source {
return &file{path: path}
}

func (f *file) Load() (kvs []*config.KeyValue, err error) {
fi, err := os.Stat(f.path)
if err != nil {
return nil, err
}

// 如果是目录,加载目录下所有文件
if fi.IsDir() {
return f.loadDir(f.path)
}

// 如果是文件,加载单个文件
kv, err := f.loadFile(f.path)
if err != nil {
return nil, err
}
return []*config.KeyValue{kv}, nil
}

func (f *file) loadDir(path string) (kvs []*config.KeyValue, err error) {
files, err := os.ReadDir(path)
if err != nil {
return nil, err
}

for _, file := range files {
// 忽略隐藏文件和目录
if file.IsDir() || strings.HasPrefix(file.Name(), ".") {
continue
}

kv, err := f.loadFile(filepath.Join(path, file.Name()))
if err != nil {
return nil, err
}
kvs = append(kvs, kv)
}
return
}

func (f *file) loadFile(path string) (*config.KeyValue, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()

data, err := io.ReadAll(file)
if err != nil {
return nil, err
}

info, err := file.Stat()
if err != nil {
return nil, err
}

return &config.KeyValue{
Key: info.Name(),
Format: format(info.Name()), // 根据扩展名判断格式
Value: data,
}, nil
}

format 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// config/file/format.go
func format(filename string) string {
ext := filepath.Ext(filename)
switch ext {
case ".json":
return "json"
case ".yaml", ".yml":
return "yaml"
case ".xml":
return "xml"
default:
return ""
}
}

Env Source

Env Source 从环境变量加载配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// config/env/env.go
type env struct {
prefixes []string // 支持前缀过滤
}

func NewSource(prefixes ...string) config.Source {
return &env{prefixes: prefixes}
}

func (e *env) Load() (kvs []*config.KeyValue, err error) {
return e.load(os.Environ()), nil
}

func (e *env) load(envs []string) []*config.KeyValue {
var kvs []*config.KeyValue

for _, env := range envs {
// 解析环境变量(格式:KEY=VALUE)
k, v, _ := strings.Cut(env, "=")
if k == "" {
continue
}

// 前缀过滤
if len(e.prefixes) > 0 {
prefix, ok := matchPrefix(e.prefixes, k)
if !ok || k == prefix {
continue
}
k = strings.TrimPrefix(k, prefix)
k = strings.TrimPrefix(k, "_")
}

if k != "" {
kvs = append(kvs, &config.KeyValue{
Key: k,
Value: []byte(v),
})
}
}

return kvs
}

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
// 只加载 KRATOS_ 前缀的环境变量
c := config.New(
config.WithSource(
env.NewSource("KRATOS_"),
),
)

// 环境变量 KRATOS_SERVER_ADDR=0.0.0.0:8080
// Env Source 去掉前缀 KRATOS_ 后,Key 为 "SERVER_ADDR",Format 为空
// Decoder 按 "." 分隔 key 展开,"SERVER_ADDR" 不含 ".",所以最终配置为:
// SERVER_ADDR = "0.0.0.0:8080"
// 注意:如果希望展开为嵌套结构,环境变量名需要用 "." 分隔,如 KRATOS_server.addr=0.0.0.0:8080

配置热更新

Watch 机制

Config 通过 Watch 机制实现配置热更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// config/config.go
func (c *config) watch(w Watcher) {
for {
// 1. 阻塞等待配置变化
kvs, err := w.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
log.Infof("watcher's ctx cancel : %v", err)
return
}
time.Sleep(time.Second)
log.Errorf("failed to watch next config: %v", err)
continue
}

// 2. 合并新配置
if err := c.reader.Merge(kvs...); err != nil {
log.Errorf("failed to merge next config: %v", err)
continue
}

// 3. 解析占位符
if err := c.reader.Resolve(); err != nil {
log.Errorf("failed to resolve next config: %v", err)
continue
}

// 4. 更新缓存并通知观察者
c.cached.Range(func(key, value any) bool {
k := key.(string)
v := value.(Value)

// 获取新值
if n, ok := c.reader.Value(k); ok &&
reflect.TypeOf(n.Load()) == reflect.TypeOf(v.Load()) &&
!reflect.DeepEqual(n.Load(), v.Load()) {

// 更新缓存
v.Store(n.Load())

// 通知观察者
if o, ok := c.observers.Load(k); ok {
o.(Observer)(k, v)
}
}
return true
})
}
}

File Watcher 实现

File Watcher 使用 fsnotify 监听文件变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// config/file/watcher.go
type watcher struct {
f *file
fw *fsnotify.Watcher

ctx context.Context
cancel context.CancelFunc
}

func newWatcher(f *file) (config.Watcher, error) {
// 1. 创建 fsnotify 监听器
fw, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}

// 2. 监听文件或目录
if err := fw.Add(f.path); err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
return &watcher{
f: f,
fw: fw,
ctx: ctx,
cancel: cancel,
}, nil
}

func (w *watcher) Next() ([]*config.KeyValue, error) {
select {
case <-w.ctx.Done():
return nil, w.ctx.Err()

case event := <-w.fw.Events:
// 处理文件重命名(编辑器保存时会触发)
if event.Op == fsnotify.Rename {
if _, err := os.Stat(event.Name); err == nil || os.IsExist(err) {
if err := w.fw.Add(event.Name); err != nil {
return nil, err
}
}
}

// 确定要加载的文件路径
fi, err := os.Stat(w.f.path)
if err != nil {
return nil, err
}

path := w.f.path
if fi.IsDir() {
path = filepath.Join(w.f.path, filepath.Base(event.Name))
}

// 加载文件
kv, err := w.f.loadFile(path)
if err != nil {
return nil, err
}

return []*config.KeyValue{kv}, nil

case err := <-w.fw.Errors:
return nil, err
}
}

func (w *watcher) Stop() error {
w.cancel()
return w.fw.Close()
}

监听配置变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// config/config.go
func (c *config) Watch(key string, o Observer) error {
// 1. 检查 key 是否存在
if v := c.Value(key); v.Load() == nil {
return ErrNotFound
}

// 2. 注册观察者
c.observers.Store(key, o)
return nil
}

// Observer 类型
type Observer func(key string, value Value)

使用示例:

1
2
3
4
// 监听配置变化
c.Watch("server.http.addr", func(key string, value config.Value) {
fmt.Printf("配置变化: %s = %v\n", key, value.Load())
})

热更新流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
文件变化

fsnotify 检测到事件

Watcher.Next() 返回新的键值对

Reader.Merge() 合并新配置

Reader.Resolve() 解析占位符

更新缓存

通知观察者

Demo 示例

让我们通过几个完整的示例来演示配置模块的使用。

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"log"

"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
)

func main() {
// 创建配置实例
c := config.New(
config.WithSource(
file.NewSource("configs/server.yaml"),
),
)
defer c.Close()

// 加载配置
if err := c.Load(); err != nil {
log.Fatalf("加载配置失败: %v", err)
}

// 获取配置值
addr := c.Value("server.http.addr")
fmt.Printf("HTTP 地址: %s\n", addr.String())

timeout := c.Value("server.http.timeout")
fmt.Printf("超时时间: %v\n", timeout.Load())
}

配置合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 从多个配置文件加载,后加载的会覆盖前面的
c := config.New(
config.WithSource(
file.NewSource("configs/base.yaml"), // 基础配置
file.NewSource("configs/dev.yaml"), // 开发环境配置(覆盖基础配置)
),
)
defer c.Close()

if err := c.Load(); err != nil {
log.Fatalf("加载配置失败: %v", err)
}

// 验证配置合并
dbSource := c.Value("data.database.source")
fmt.Printf("数据库连接: %s\n", dbSource.String())

Scan 到结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
c := config.New(
config.WithSource(
file.NewSource("configs/server.yaml"),
),
)
defer c.Close()

if err := c.Load(); err != nil {
log.Fatalf("加载配置失败: %v", err)
}

// Scan 到 map
var serverConf map[string]any
if err := c.Scan(&serverConf); err != nil {
log.Fatalf("Scan 失败: %v", err)
}
fmt.Printf("完整配置: %v\n", serverConf)

// Scan 到结构体
type HTTPConfig struct {
Addr string `json:"addr"`
Timeout string `json:"timeout"`
}

var httpConf HTTPConfig
if err := c.Value("server.http").Scan(&httpConf); err != nil {
log.Printf("Scan 失败: %v", err)
} else {
fmt.Printf("HTTP 配置: %+v\n", httpConf)
}

配置热更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
c := config.New(
config.WithSource(
file.NewSource("configs/server.yaml"),
),
)
defer c.Close()

if err := c.Load(); err != nil {
log.Fatalf("加载配置失败: %v", err)
}

// 监听配置变化
err := c.Watch("server.http.addr", func(key string, value config.Value) {
fmt.Printf("配置变化: %s = %v\n", key, value.Load())
})
if err != nil {
log.Printf("Watch 失败: %v", err)
}

fmt.Println("正在监听配置变化...")
// 修改 configs/server.yaml 文件,会自动触发回调

多源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/config/env"
)

// 同时从文件和环境变量加载配置
c := config.New(
config.WithSource(
file.NewSource("configs/server.yaml"), // 文件配置
env.NewSource("KRATOS_"), // 环境变量(KRATOS_ 前缀)
),
)
defer c.Close()

if err := c.Load(); err != nil {
log.Fatalf("加载配置失败: %v", err)
}

// 环境变量 KRATOS_server.http.addr=0.0.0.0:9000
// Env Source 去掉前缀 KRATOS_ 后 Key 为 "server.http.addr"
// Decoder 按 "." 分隔展开为嵌套 map,可覆盖文件中的 server.http.addr
addr := c.Value("server.http.addr")
fmt.Printf("HTTP 地址: %s\n", addr.String())

扩展配置源

Kratos 配置模块支持扩展自定义配置源。只需要实现 Source 接口即可。Kratos 社区已经提供了多个配置源的实现:

  • contrib/config/apollo:Apollo 配置中心
  • contrib/config/consul:Consul 配置中心
  • contrib/config/etcd:etcd 配置中心
  • contrib/config/nacos:Nacos 配置中心
  • contrib/config/kubernetes:Kubernetes ConfigMap

小结

Kratos 配置模块是一个分层抽象、灵活可扩展的配置管理系统,支持配置合并、多种配置源、源热更新、占位符解析等功能。通过研究该模块的代码,我们可以学习到一个简易、通用的配置管理系统应该如何实现。