0%

《Go 语言精进之路》读书笔记(06):并发编程

Go 以其轻量级的并发编程思想而闻名,这篇文章将详细介绍 Go 基本执行单元:goroutine 的调度原理、Go 并发模型以及常见并发模式、Go 支持并发的原生类型–channel 的惯用使用模式等内容。

优先考虑并发设计

并发不是并行,并发(Concurrency)关乎结构,并行(Parallelism)关乎执行。Go 原生支持并发。

并发与并行

并行方案就是在处理器核数充足的情况下,启动多个单线程应用的实例,这样每个实例运行在一个核上,尽可能地利用多核计算资源。并发就是重新做应用结构设计,即将应用分解成多个基本执行单元中执行的、可能有一定关联关系的代码片段。

基于多线程模型的应用设计就是一种典型的并发程序设计,这种以操作系统线程作为承载分解后的代码片段的执行单元,由操作系统执行调度。因为操作系统线程的创建、销毁以及线程间上下文切换的代价都比较大。而且线程的接口多以标准库形式提供,线程间通信原语也不足,用户层接口较为晦涩。

Go 的设计哲学之一就是 原生并发,轻量高效。Go 没有使用操作系统线程作为承载分解后的代码片段(模块)的基本执行单元,而是实现了 goroutine 这一有 Go 运行时负责调度的用户层轻量级线程为并发程序设计提供原生支持。它具有如下特点:

  • 资源占用小,每个 goroutine 的初始栈大小仅为 2KB
  • 由 Go 运行时而不是操作系统调度,goroutine 上下文切换代价较小
  • 语言原生支持:goroutine 由 go 关键字接函数或者方法创建,函数或方法返回即表示 goroutine 退出
  • 语言内置 channel 作为 goroutine 间通信的原语,为并发设计提供强大支持

在应用的结构设计阶段,Go 的惯例是优先考虑并发设计,这样应用可以更好、更自然地适应规模化。

Go 并发设计实例

串行程序设计的弊端是:所有任务是串行顺序执行的,当计算资源在处理某个任务,其他任务就无法执行,即使此时计算资源充足。并发关乎结构,并行关乎执行,并发和并行是两个阶段的事情,并发在程序的设计和实现阶段,并行在程序的执行阶段。

Go 鼓励在程序设计时优先按照并发设计思路组织程序结构,进行独立计算的分解。只有并发设计才能让应用自然适应计算资源的规模化,并显出更大的威力。

了解 goroutine 调度原理

goroutine 是由 Go 运行时管理的用户层轻量级线程,无论是 Go 自身运行时代码还是用户层 Go 代码都是在 goroutine 中运行。Go 的运行时负责 goroutine 的调度。

goroutine 调度器

将 goroutine 按照一定算法放到 CPU 上执行的程序就称为 goroutine 调度器。一个 Go 程序对于操作系统来说只是一个用户层程序,操作系统眼中只有线程,goroutine 的调度应该要靠 Go 自己完成。

Go 1.0 版本使用了一个简单的 goroutine 调度器,称为 G-M 模型。在这个调度器中,每个 goroutine 对应运行时中的一个抽象结构 G(goroutine),而被视作物理 CPU 的操作系统线程则被抽象为另一个结构 M(machine)。G-M 模型实现起来比较简单且能正常工作,但是限制了 Go 并发程序的伸缩性。

之后,Dmitry Vyukov 实现了 G-P-M 调度模型和 work stealing 算法,这一模型一直沿用至今。该模型在 G-M 模型中间增加了一个 P(逻辑处理器),使得 goroutine 调度器具有很好的伸缩性。Dmitry Vyukov 还负责实现了 Go 抢占式调度器设计。除非极端的无限循环或者死循环,否则只要 G 调用函数,Go 运行时就有了抢占 G 的机会。

Go 提供了调度器当前状态的查看方法:使用 Go 运行时环境变量 GODEBUG。

Go 从 1.5 版本开始,将 P 的默认数量从 1 改为 CPU 核(逻辑核)的数量。runtime.GOMAXPROCS(n) 函数可以修改 P 的数量为指定值 n。

掌握 Go 并发模型和常见并发模式

不要通过共享内存来通信,而应该通过通信来共享内存。

Go 并发模型

传统语言的并发逻辑多是基于操作系统的线程,其并发执行单元之间的通信则是利用操作系统提供的线程或进程间通信的原语,例如共享内存、信号、管道、消息队列、套接字等。这种传统的基于共享内存的并发模型很难用,而且易错,尤其在大型或复杂的程序中。

Go 的并发模型借鉴了计算机科学家 Tony Hoare 提出的 CSP(Communicating Sequential Process,通信顺序进程)模型。在该模型中,输入/输出应该是基本的编程原语,数据处理逻辑(CSP 中的 P)仅需要调用输入原语获取数据,顺序处理数据,并将结果数据通过输出原语即可。因此一个符合 CSP 模型的并发程序应该是一组通过输入/输出原语连接起来的 P 集合。从这个角度看,CSP 理论不仅是一个并发参考模型,也是一种并发程序的程序组织方法。

为了实现 CSP 模型中的输入/输出原语,Go 引入了 goroutine(P)之间的通信原语 channel。goroutine 可以从 channel 获取输入数据,再将处理后额结果数据通过 channel 输出。

虽然 CSP 模型已经成为 Go 语言支持的主流并发模型,但 Go 也支持传统的基于共享内存的并发模型,并提供基本的低级同步原语(主要是 sync 包中的互斥锁、条件变量、读写锁、原子操作等)。Go 始终推荐以 CSP 模型风格构建并发程序,对于局部情况,比如性能敏感的区域或需要保护的结构体数据,可以使用更为高效的低级同步原语(例如 sync.Mutex)。

Go 常见的并发模式

Go 针对 CSP 模型提供了 3 种并发原语:

  • goroutine:对应 CSP 模型中的 P,封装了数据处理逻辑,是 Go 运行时调度的基本执行单元
  • channel:对于 CSP 模型中的输入/输出原语,用于 goroutine 之间的通信和同步
  • select:用于应对多路输入/输出,可以让 goroutine 同时协调处理多个 channel 操作

Go 使用 go 关键字 + 函数/方法创建 goroutine,在稍微复杂的并发程序中,需要考虑通过 CSP 模型输入/输出原语承载体 channel 在 goroutine 之间建立连接,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type T struct {

}

func spawn(f func()) chan T {
c := make(chan T)
go func() {
...
f()
...
}()

return c
}

func main() {
c := spawn(func(){})
}

channel 也是 Go 语言一等公民:channel 可以像变量一样被初始化、传递和赋值。

goroutine 的执行函数返回,即意味着 goroutine 退出。但有一些常驻后台的服务程序可能会对 goroutine 有着优雅退出的要求,这里介绍几种 goroutine 的退出模式,这里借鉴了线程模型中的术语:

分离模式:对于分离的 goroutine,创建它的 goroutine 不需要关心它的退出。这类 goroutine 在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。

join 模式:goroutine 的创建者需要等待新的 goroutine 结束,如下是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import "time"

func work(args ...interface{}) {
if len(args) == 0 {
return
}

interval, ok := args[0].(int)
if !ok {
return
}

time.Sleep(time.Second * time.Duration(interval))
}

func spawn(fn func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
fn(args...)
c <- struct{}{}
}()
return c
}

func main() {
done := spawn(work, 3)
println("spawn a worker goroutine")
_ = <-done
println("worker done")
}
  • 如果新的 goroutine 不仅要等待 goroutine 的退出,还要获取其结束状态,同样可以通过自定义类型的 channel 来实现,例如将上面示例 channel 的类型从 chan struct{} 改为 chan error

  • 有时 goroutine 的创建者可能会创建不止一个 goroutine,并且需要等待全部 goroutine 退出,可以通过 Go 语言提供的 sync.WaitGroup 实现等待多个 goroutine 退出的模式。每创建一个 goroutine,都会调用 wg.Add(1),而新创建的 goroutine 执行完毕退出前都会调用 wg.Done()。当所有 goroutine 都退出后,wg.Wait() 函数则会返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"fmt"
"sync"
"time"
)

func worker(args ...interface{}) {
if len(args) == 0 {
return
}

interval, ok := args[0].(int)
if !ok {
return
}

time.Sleep(time.Second * time.Duration(interval))
return
}

func spawnGroup(n int, fn func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup

for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d", i)
fn(args...)
println(name, "done")
wg.Done()
}(i)
}

go func() {
wg.Wait()
c <- struct{}{}
}()

return c
}

func main() {
done := spawnGroup(5, worker, 3)
println("spawn a group of workers")
_ = <-done
println("group workers done")
}
  • 支持超时机制的等待:可以通过定时器设置超时等待时间,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
done := spawnGroup(5, worker, 30)
println("spawn a group of workers")

timer := time.NewTimer(time.Second * 5)
defer timer.Stop()

select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-done:
println("group workers done")
}
}

notify-and-wait 模式:很多时候 goroutine 的创建者需要主动通知那些新的 goroutine 退出,尤其是当 main goroutine 作为创建者时。我们可以通过 `notify-and-wait 模式来满足这一场景要求

  • 如下代码展示了通知并等待一个 goroutine 退出,此时 spawn 返回的 channel 的作用发生了变化,从原先的只是用于新 goroutine 发送退出信号给创建者,变成了一个双向的数据通道:即承载了创建者发送给新 goroutine 的退出信号,也承载新 goroutine 返回给创建者的退出状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import "time"

func worker(j int) {
time.Sleep(time.Second * time.Duration(j))
}

func spawn(fn func(int)) chan string {
quit := make(chan string)

go func() {
var job chan int
for {
select {
case j := <-job:
fn(j)
case <-quit:
quit <- "ok"
}
}
}()

return quit
}

func main() {
quit := spawn(worker)
println("spawn a worker goroutine")

time.Sleep(5)
println("notify the worker to exit")
quit <- "exit"

timer := time.NewTimer(time.Second * 10)
defer timer.Stop()

select {
case status := <-quit:
println("worker done:", status)
case <-timer.C:
println("wait worker exit timeout")
}
}
  • 通知并等待多个多个 goroutine 退出:Go 语言的 channel 有一个特性是,当使用 close 函数关闭 channel 时,所有阻塞到该 channel 上的 goroutine 都会得到通知:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package main

import (
"fmt"
"sync"
"time"
)

func worker(j int) {
time.Sleep(time.Second * time.Duration(j))
}

func spawnGroup(n int, fn func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup

for i := 0; i < n; i++ {
wg.Add(1)

name := fmt.Sprintf("worker-%d", i)
go func(i int) {
defer wg.Done()
for {
select {
case j, ok := <-job:
if !ok {
println(name, " done")
return
}
fn(j)
}
}
}(i)
}

go func() {
<-quit
close(job)
wg.Wait()
quit <- struct{}{}
}()

return quit
}

func main() {
quit := spawnGroup(5, worker)
println("spawn a group of workers")

time.Sleep(5 * time.Second)
println("notify the worker group to exit")
quit <- struct{}{}

timer := time.NewTimer(time.Second)
defer timer.Stop()

select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-quit:
println("group workers done")
}
}

这段代码的关键是当创建者关闭 job channel 时,所有 worker goroutine 都是得到通知,此时通过 comma ok 模式获取 ok 值为 false,即表明该 channel 已经关闭

Go 中的 channel 原语使得构建管道并发模式变得容易且自然。如下展示了一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

func newNumGenerator(start, count int) <-chan int {
c := make(chan int)

go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()

return c
}

func filterOdd(i int) (int, bool) {
if i%2 == 0 {
return 0, false
}

return i, true
}

func square(i int) (int, bool) {
return i * i, true
}

func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)

go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()

return out
}

func main() {
in := newNumGenerator(1, 20)
out := spawn(square, spawn(filterOdd, in))

for v := range out {
println(v)
}
}

接下来再来了解两种基于管道模式的扩展模式:

  • 扇出模式:多个功能相同的 goroutine 从同一个 channel 读取数据并处理,直到该 channel 关闭。这种情况称为扇出。使用扇出模式可以在一组 goroutine 中均衡分配工作量,从而更均衡地利用 CPU 资源
  • 扇入模式:在某个处理环节,处理程序面对不止一个输入 channel,把所有输入 channel 的数据汇聚到统一的输入 channel,然后处理程序再从这个 channel 中读取数据并处理,直到该 channel 因所有输入 channel 关闭而关闭,这种情况就称为 扇入

如下是扇出/扇入模式的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main

import (
"fmt"
"sync"
"time"
)

func newNumGenerator(start, count int) <-chan int {
c := make(chan int)

go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()

return c
}

func filterOdd(i int) (int, bool) {
if i%2 == 0 {
return 0, false
}

return i, true
}

func square(i int) (int, bool) {
return i * i, true
}
func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
groupOut := make(chan int)
var outSlice []chan int

for i := 0; i < num; i++ {
out := make(chan int)
go func(i int) {
name := fmt.Sprintf("%s-%d", name, i)
fmt.Printf("%s begin to work...\n", name)

for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
fmt.Printf("%s work done\n", name)
}(i)

outSlice = append(outSlice, out)
}

go func() {
var wg sync.WaitGroup
for _, out := range outSlice {
wg.Add(1)
go func(out <-chan int) {
for v := range out {
groupOut <- v
}
wg.Done()
}(out)
}

wg.Wait()
close(groupOut)
}()

return groupOut
}

func main() {
in := newNumGenerator(1, 20)
out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))

time.Sleep(3 * time.Second)

for v := range out {
println(v)
}
}

如果需要实现超时取消模式,可以通过定时器实现,并通过 select 原语监视该定时器事件和响应 channel 上的事件。如果要在超时之后,取消已经创建的 goroutine(on-flight goroutine),我们可以使用 Go 的 context 包来实现取消模式。

了解 channel 的妙用

channel 在 Go 的 CSP 模型中,既可以实现 goroutine 间的通信,又可以实现 goroutine 间的同步。channel 在 Go 中是 一等公民,如下是 channel 的使用原语:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
c := make(chan int)
c := make(chan int, 10)
c <- x
<- c
x = <- c
x, ok = <- c

for i := range c {...}
close(c)

c := make(chan chan int)
func stream(ctx context.Context, out chan<- Value) error { ... }

func spawn(...) <-chan T

当要同时对多个 channel 进行操作时,可以使用 select

1
2
3
4
5
6
7
8
9
10
select {
case x := <- c1:
...
case y, ok := <- c2:
...
case c3 <- z:
...
default:
...
}

无缓冲 channel

无缓冲 channel 兼具通信和同步特性,在并发程序中应用广泛。当使用不带有 capacity 参数的内置 make 函数创建一个可用的无缓冲 channel:

1
c := make(chan T)

由于无缓冲的 channel 在运行时中实现不带有缓冲区,因此对无缓冲 channel 的接收和发送操作是同步的。即对于同一个无缓冲 channel,只有在对其进行接收操作的 goroutine 和对其进行发送操作的 goroutine 都存在,通信才能进行,否则单方面的操作会让对应的 goroutine 阻塞

对于无缓冲 channel:

  • 发送动作一定发生在 接收动作完成 之前
  • 接收动作一定发生在 发送动作完成 之前
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

var a string
var c = make(chan int)

func f() {
a = "hello"
<-c
}

func main() {
go f()
c <- 5
println(a)
}

所以无缓冲 channel 可以实现以下目的:

  • 在两个 goroutine 之间一对一地传递通知信号
  • 用来实现一对多的信号通知机制,这样的信号通知机制常被用于协调多个 goroutine 一起工作。如下代码中,关闭一个无缓冲的 channel 会让所有阻塞在该 channel 上的接收操作返回,从而实现一种一对多的广播机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"sync"
"time"
)

type signal struct{}

func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d: done...\n", i)
}

func spawnGroup(f func(int), num int, groupSignal <-chan signal) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup

for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
<-groupSignal
fmt.Printf("worker %d: start to work...\n", i)
f(i)
wg.Done()
}(i)
}

go func() {
wg.Wait()
c <- signal(struct{}{})
}()

return c
}

func main() {
fmt.Println("start a group of workers...")
groupSignal := make(chan signal)
done := spawnGroup(worker, 3, groupSignal)
fmt.Println("the group of workers start to work...")
close(groupSignal)
<-done
fmt.Println("the group of workers work done!")
}

无缓冲 channel 具有同步特性,这让它在某些场合下可以替代锁,从而使得程序更加清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"time"
)

type counter struct {
c chan int
i int
}

var cter counter

func InitCounter() {
cter = counter{
c: make(chan int),
}

go func() {
for {
cter.i++
cter.c <- cter.i
}
}()

println("counter init ok")
}

func Increase() int {
return <-cter.c
}

func init() {
InitCounter()
}

func main() {
for i := 0; i < 10; i++ {
go func(i int) {
v := Increase()
fmt.Println("current value %d", v)
}(i)
}

time.Sleep(5 * time.Second)
}

带缓冲 channel

带缓冲 channel 可以通过带有 capacity 参数的内置 make 函数创建:

1
c := make(chan T, capacity)

由于带缓冲 channel 在运行时的实现带有缓冲区,因此对带缓冲区 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收无须阻塞等待)。在缓冲区满时发送操作才会阻塞;在缓冲区空时接收操作才会阻塞。

可自行设置容量、异步收发的带缓冲 channel 更适合用作消息队列,并且带缓冲 channel 在数据收发性能上明显好于无缓冲 channel

Go 并发设计的一个惯用法是将带缓冲的 channel 用作计数信号量(counting semaphore):

  • channel 中当前数据的个数可以表示当前处于活动状态的 goroutine 数量
  • channel 的容量则代表允许同时处于活动状态的 goroutine 最大数量
  • 向 channel 中发送数据表示获取一个信号量槽位
  • 从 channel 中读取一个数据表示释放一个信号量槽位

如下是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"sync"
"time"
)

var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)

func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- (i + 1)
}
close(jobs)
}()

var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
go func(j int) {
active <- struct{}{}
println("handle job", j)
time.Sleep(2 * time.Second)
<-active
wg.Done()
}(j)
}

wg.Wait()
}

len 是 Go 语言原生内置的函数,它可以接受数组、切片、map、字符串或 channel 类型的参数,并返回对应类型的长度。如果参数是 channel 类型:

  • 对于无缓冲 channel,len 总返回 0
  • 当 s 为缓冲 channel,len 总是返回当前 channel 中尚未被读取的元素个数

如果需要将判断 channel 长度逻辑与读取写入逻辑放在一起,那么必须将它们实现成一个事务,这类事务可以通过 select 实现:

1
2
3
4
5
6
7
8
func tryRecv(c <- chan int) (int, bool) {
select {
case i := <- c:
return i, true
default:
return 0, false
}
}
1
2
3
4
5
6
7
8
func trySend(c chan <- int, i int) bool {
select {
case c <- i:
return true
default:
return false
}
}

由于用到了 select 原语的 default 分支,当 channel 为空时,tryRecv 不会被阻塞,当 channel 满时,trySend 不会阻塞。

对于 多发送单接收 场景,可以在接收者中通过 len(channel) > 0 来判断 channel 中是否有数据接收。对于 多接收单发送 场景,可以在发送者中通过 len(channel) < cap(channel) 来判断 channel 中是否可以继续发送。

nil channel 的妙用

对于没有初始化的 channel(nil channel)进行读写操作将会发生阻塞。有时候利用这个 nil channel 这个特性,可以达到事半功倍的效果。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import "time"

func main() {
c1 := make(chan int)
c2 := make(chan int)

go func() {
time.Sleep(5 * time.Second)
c1 <- 1
close(c1)
}()

go func() {
time.Sleep(8 * time.Second)
c2 <- 2
close(c2)
}()

for {
select {
case x, ok := <-c1:
if !ok {
c1 = nil
} else {
println(x)
}

case x, ok := <-c2:
if !ok {
c2 = nil
} else {
println(x)
}

}

if c1 == nil && c2 == nil {
break
}
}
}

这里的关键是在判断出 c1 或 c2 被关闭后,显式地将 c1、c2 设置为 nil。否则的话,从一个已经关闭的 channel 中接收数据将永远不会被阻塞,此时就会导致 select 继续选中这个已经关闭的 channel 分支执行(此时返回该 channel 元素类型的零值)。

与select 结合使用的一些惯用法

channel 与 select 结合使用能够形成强大的表达能力。这里再简单总结下:

  • select 语句的 default 分支的语义是在其他分支均因通信未就绪而无法被选择的时候执行,这就为 default 分支赋予了一种 避免阻塞 的特性
  • 带超时机制的 select 是 Go 语言中一种常见的 select 和 channel 的组合用法,通过超时事件,既可以避免长期陷入某种操作的等待中,也可以做一些异常处理工作。例如:
1
2
3
4
5
6
7
8
func worker() {
select {
case <- c:

case <- time.After(30 * time.Second)
return
}
}
  • 在使用 timer.Timer 时,要特别注意 timer 使用后的释放,即要及时调用 timer 的 Stop 方法从最小堆中删除尚未到达过期时间的 timer 对象

  • 结合 time 包的 Ticker,可以实现带有心跳机制的 select。同样,在使用完 ticker 之后,要记得调用其 Stop 方法停止 ticker 的运作。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
func worker() {
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()

for {
select {
case <- c:
...
case <- heartbeat.C:
...
}
}
}

了解 sync 包的正确用法

Go 在提供 CSP 并发模型原语的同时,还通过标准库的 sync 包提供了针对传统基于共享内存并发模型的基本同步原语,包括互斥锁(sync.Mutex)、读写锁(sync.RWMutex)和条件变量(sync.Cond)等。

使用 sync 包还是 channel

虽然建议大家优先使用 CSP 并发模型进行并发程序设计,但是以下场景我们依然需要 sync 包提供的低级同步原语:

  • 需要高性能的临界区同步机制场景:channel 属于高级同步原语,其实现是构建在低级同步原语之上。因此 channel 自身的性能与低级同步原语相比要低一些。因此在需要高性能的临界区同步机制的情况下,sync 包提供的低级同步原语更为适合
  • 不想转移结构体对象所有权,但又要保证结构体内部状态数据的同步访问的场景:基于 channel 的并发设计的一个特点是,在 goroutine 间通过 channel 转移数据对象的所有权。只有拥有数据对象所有权(从 channel 接收到该数据)的 goroutine 才可以对该数据对象进行状态变更

使用 sync 包的注意事项

包括 Mutex 等 sync 包中定义的结构类型在首次使用后不应该对其进行复制操作。在使用 sync 包中的类型时,推荐通过闭包方式或者传递类型实例(或者包裹该类型的类型实例)的地址或指针的方式进行。

互斥锁还是读写锁

sync 包提供了两种用于临界区同步的原语:

  • 互斥锁(Mutex):临界区同步原语的首选,在并发量较小的情况下,互斥锁性能更好
  • 读写锁(RWMutex):适合在应用在具有一定并发量且读多写少的场合中。读写锁的读锁性能并不会随着并发量的增大而变化,但写锁在并发量大的情况下性能比互斥锁、读写锁的读锁都差

条件变量

sync.Cond 是传统的条件变量在 Go 语言中的实现,一个条件变量可以理解为一个容器,这个容器中存放着一个或一组等待着某个条件成立的 goroutine。当条件成立时,这些处于等待状态的 goroutine 将得到通知并被唤醒以继续后续的工作。条件变量是同步原语的一种,如果没有条件变量,开发人员可能需要在 goroutine 中通过连续轮询的方式检查是否满足条件,连续轮询非常消耗资源。

如下是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"fmt"
"sync"
"time"
)

type signal struct{}

var ready bool

func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d: done\n", i)
}

func spawnGroup(f func(i int), num int, groupSignal *sync.Cond) chan signal {
c := make(chan signal)
var wg sync.WaitGroup

for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
groupSignal.L.Lock()
for !ready {
groupSignal.Wait()
}
groupSignal.L.Unlock()

f(i)
}(i)
}

go func() {
wg.Wait()
c <- signal(struct{}{})
}()

return c
}

func main() {
fmt.Println("start a group of workers")
groupSignal := sync.NewCond(&sync.Mutex{})
c := spawnGroup(worker, 5, groupSignal)

time.Sleep(5 * time.Second)
fmt.Printf("the group of workers start to work...")

groupSignal.L.Lock()
ready = true
groupSignal.Broadcast()
groupSignal.L.Unlock()

<-c
fmt.Printf("the group of workers done\n")
}
  • sync.Cond 实例的初始化需要一个满足了实现了 sync.Locker 接口类型实例,通常直接使用 sync.Mutex
  • 各个等待条件成立的 gorotine 在加锁后判断条件是否成立,如果不成立,则调用 sync.Cond 的 Wait 方法进行等待状态。Wait 方法在 goroutine 挂起前会进行 Unlock 操作
  • main groutine 将 ready 设置为 true 后并调用 sync.Cond 的 Broadcast 方法后,各个阻塞的 goroutine 将被唤醒并从 Wait 方法中返回。Wait 方法会再次加锁让 goroutine 进入临界区,并再次判断条件数据

使用 sync.Once 实现单例模式

之前我们介绍过 init 函数,它在程序运行期间只被执行一次且满足 goroutine 安全。sync 包提供了另外一种更为灵活的机制,可以保证任意一个函数在程序运行期间只被执行一次,即 sync.Once。

sync.Once 的 仅执行一次 语义被一些包用于初始化和资源清理的过程中,以避免重复执行初始化或资源关闭操作。sync.Once 的语义十分适合实现单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"sync"
"time"
)

type Foo struct{}

var once sync.Once
var instance *Foo

func GetInstance() *Foo {
defer func() {
if e := recover(); e != nil {
println("caught a panic", e)
}
}()

println("get instance")
once.Do(func() {
instance = &Foo{}
time.Sleep(3 * time.Second)
panic("panic in once.Do function")
})

return instance
}

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
inst := GetInstance()
fmt.Printf("the return addr of instance %p\n", inst)
wg.Done()
}()
}

wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
# ./main
get instance
get instance
get instance
get instance
get instance
caught a panic (0x483c80,0x4b4720)
the return addr of instance 0x0
the return addr of instance 0x54f3e0
the return addr of instance 0x54f3e0
the return addr of instance 0x54f3e0
the return addr of instance 0x54f3e0
  • once.Do(f) 会等待 f 执行完毕后才返回,这期间其他执行 once.Do 函数的 goroutine 将会阻塞等待
  • Do 函数返回后,后续的 goroutine 再执行 Do 函数将不再执行 f 并立即返回
  • 即便函数 f 中出现 panic,sync.Once 原语也认为 once.Do 执行完毕,后续对 once.Do 的调用将不再执行 f

使用 sync.Pool 减轻垃圾回收的压力

sync.Pool 时是一个 数据对象 缓存池,它具有如下特点:

  • 放入该缓存池中的数据对象的生命是暂时的,随时都可能被垃圾回收
  • 缓存池中的数据对象是可以重复利用的,这样可以在一定程度上降低数据对象重新分配的频率,减轻 GC 的压力
  • sync.Pool 为每个 P(goroutine 调度模型中的 P)单独建立一个 local 缓存池,进一步降低高并发下对锁的争夺

sync.Pool 的一个典型应用就是建立像 bytes.Buffer 这种类型的临时缓存对象池:

1
2
3
4
5
var bufPool = sync.Pool {
New: func() interface{} {
return new(bytes.Buffer)
}
}

在实践中,为了避免大内存的 Buffer 对象被用于小数据而造成内存没有及时释放,Go 标准库采用两种方式来缓解这一问题:

  • 限制要放回缓存池中的数据对象的大小
  • 建立多级缓存池:这样就可以根据要处理的数据大小从最适合的缓存池中获取 Buffer 对象,并在完成数据处理后将对象归还到对应的池中

使用 atomic 包实现伸缩性更好的并发读取

atomic 包与原子操作

atomic 包是 Go 语言提供的原子操作(原语)的相关接口。原子操作的指令是不可中断的,它好比一个事务,要么不执行,一旦执行就一次性全部执行完毕,不可分割。因此,原子操作可用于共享数据的并发同步。

原子操作由底层硬件直接提供支持,是一种硬件实现的指令级事务。因此 atomic 包中提供的原语更接近硬件底层,也更为低级,它常用来实现更为高级的并发同步技术。

对共享整型变量的无锁读写

atomic 包提供了两大类原子接口:

  • 针对整型变量
  • 针对自定义类型
1
2
3
4
5
6
7
8
9
var n1 int64

func addSyncByAtomic(delta int64) int64 {
return atomic.AddInt64(&n1, delta)
}

func readSyncByAtomic() int64 {
return atomic.LoadInt64(&n1)
}

对共享自定义类型变量的无锁读写

atomic 通过 Value 类型的装箱操作实现了对任意自定义类型的原子操作,从而实现对共享自定义类型变量的无锁读写的支持。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
var config atomic.Value

type Config struct {
data string
}

func setConfig(c Config) {
config.Store(c)
}

func getConfig() Config {
return config.Load().(Config)
}

随着并发量的提升,使用 atomic 实现的共享变量的并发读写性能表现更为稳定,尤其是原子读操作。这让 atomic 包与 sync 包中的原语比起来具有更好的伸缩性和更好的性能。但是 atomic 原子操作可用来同步的范围有较大的限制,仅仅是一个整型变量或者自定义类型变量。如果需要对一个复杂的临界区数据进行同步,那么首选依旧是 sync 包中的原语。