0%

Go 语言学习笔记(4):并发

Go 语言最大的特色就是其对并发编程的支持。这篇文章重点介绍 Go 语言的并发机制 go routine,以及如何通过 go channel 实现各个 go routine 之间的通信。

并发的含义

首先需要了解并发(concurrency)和并行(parallesim)的区别。并发是指逻辑上具备同时处理多个任务的能力,并行是指物理上在同一时刻执行多个并发任务。通常我们说程序是并发设计的,指的是它允许多个任务同时执行,但实际上并不一定真在同一时刻运行。在单核处理器上,它们能以间隔方式切执行。而并发则依赖多核处理器等物理设备,让多个任务真正在同一时刻执行,它代表了当前程序运行状态。并行是并发设计的理想执行模式。

多线程或多进程是并行的基本条件,但单线程也可以用协程做到并发。尽管协程在单个线程上通过主动切换来实现多任务并发,但是它也有自己的优势,除了将因阻塞而浪费的时间找回来外,还免去了线程切换的开销,有着不错的执行效率。协程上运行的多个任务本质上依旧串行的,加上可控自主调度,并不需要做同步处理。

以上方式都有各自的适用场景,通常情况下用多进程实现分布式和负载均衡,减轻单进程垃圾回收压力。用多线程(LWP)抢夺更多的处理器资源;用协程来提高处理器时间片利用率。

go-routine

goroutine 并不是简单的协程。运行时会创建出多个线程来执行并发任务,且任务单元可以被调度到其他线程并行执行。goroutine 这更像是多线程与协程的综合体,能最大限度的提升执行效率,发挥多核能力

只需要在函数调用前添加 go 关键字即可创建一个并发任务。关键字 go 并非执行并发操作,而是创建一个并发任务单元。新建的任务被放置在系统队列中,等待调度器安排合适的系统线程去获取执行权。当前调用路径不会阻塞,不会等待该任务启动,且运行时也并不保证并发任务的执行次序。

每个任务单元除了保存函数指针、调用参数外,还会分配所需要执行的栈空间。相比系统默认 MB 级别的线程栈,goroutine 自定义栈初始仅需要 2KB。因此可以创建成千上万个并发任务,自定义栈采取按需分配策略,在需要时进行扩容,最大能到 GB 规模。

与 defer 一样,goroutine 也会因为延迟执行而立即计算并复制执行参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import "time"

var c int

func counter() int {
c++
return c
}

func main() {
a := 100

go func(x, y int) {
time.Sleep(time.Second)
println("go:", x, y)
}(a, counter())

a += 100
println("main:", a, counter())
time.Sleep(time.Second * 3)
}
1
2
3
$ ./goroutine
main: 200 2
go: 100 1

进程退出时并不会等待并发任务结束,可用通道(channel)阻塞,然后发出退出信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import "time"

func main() {
exit := make(chan struct{})

go func() {
time.Sleep(time.Second)
println("goroutine done.")
close(exit)
}()

println("main...")
<-exit
println("main exit.")
}

这里除了关闭通道外,写入数据也可以解除阻塞。如果要等待多个任务结束,推荐使用 sync.WaitGroup,通过设定计数器,让每个 goroutine 在退出前递减,直至归零时解除阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"sync"
"time"
)

func main() {
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

time.Sleep(time.Second)
println("goroutine", id, "done.")
}(i)
}

println("main...")
wg.Wait()
println("main exit.")
}

可以在多处使用 Wait 阻塞,它们都能接收到通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
wg.Add(1)

go func() {
wg.Wait()
println("wait exit.")
}()

go func() {
time.Sleep(time.Second)
println("done.")
wg.Done()
}()

wg.Wait()
fmt.Println("main exit.")
}

运行时可能会创建很多线程,但任何时候仅有有限的几个线程参与并发任务执行,该数量默认与处理器的核数相等,可以用 runtime.GOMAXPROCS 函数(或环境变量)修改。如果参数小于 1,GOMAXPROCS 仅返回当前设置值,不做任何调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"math"
"runtime"
"sync"
)

func count() {
x := 0
for i := 0; i < math.MaxUint32; i++ {
x += i
}

println(x)
}

func test(n int) {
for i := 0; i < n; i++ {
count()
}
}

func test2(n int) {
var wg sync.WaitGroup
wg.Add(n)

for i := 0; i < n; i++ {
go func() {
count()
wg.Done()
}()
}

wg.Wait()
}

func main() {
n := runtime.GOMAXPROCS(0)
// test(n)
test2(n)
}

与线程不同的是,goroutine 任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。但是除了优先级外,其他功能都很容易实现。如果需要使用 map 作为局部存储,建议做同步处理,因为运行时会对其做并发读写检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup
var gs [5]struct {
id int
result int
}

for i := 0; i < len(gs); i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

gs[id].id = id
gs[id].result = (id + 1) * 100
}(i)
}

wg.Wait()
fmt.Printf("%+v\n", gs)
}

使用 Gosched 可以暂停当前任务,释放线程去执行其他任务。当任务被放回队列,等待下次调度时恢复执行。其实该函数很少被使用,因为 runtime 会主动向长时间运行的任务发出抢占调度。但是主动切换也还是有使用时机的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"runtime"
)

func main() {
runtime.GOMAXPROCS(1)
exit := make(chan struct{})

go func() {
defer close(exit)

go func() {
println("b")
}()

for i := 0; i < 4; i++ {
println("a:", i)

if i == 1 {
runtime.Gosched()
}
}

}()

<-exit
}
1
2
3
4
5
6
$ ./gosched
a: 0
a: 1
b
a: 2
a: 3

Goexit 立即终止当前任务,runtime 确保所有已注册延迟调用被执行。该函数不会影响其他其他并发任务,不会引发 panic,这样也就无法捕获。如果是在 main.main 里调用 Goexit,它会等待其他任务结束,然后让进程直接崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "runtime"

func main() {
exit := make(chan struct{})

go func() {
defer close(exit)
defer println("a")

func() {
defer func() {
println("b", recover() == nil)
}()

func() {
println("c")
runtime.Goexit()
println("c done.")
}()

println("a done.")
}()
}()

<-exit
}
1
2
3
4
$ ./goexit
c
b true
a

无论当前处于哪一层,Goexit 都能立即终止整个调用栈,这与 return 仅退出当前函数不同。标准库函数 os.Exit 可终止进程,但不会执行延迟调用。

通道

Go 并未实现严格的并发安全,允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致性和完整性。Go 鼓励使用 CSP(Communicating Sequential Process) 通道,以通信来代替内存共享,实现并发安全。通过消息来避免竞态的模型除了 CSP,还有 Actor,但是两者有较大的区别:

作为 CSP 核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。可是如果另一端未准备妥当,或者消息未能及时处理,会阻塞当前端。而 Actor 是透明的,它不在乎数据类型和通道,只需要知道接收者信箱即可。默认就是异步模式,发送方对消息是否被接收和处理并不关心。

从底层实现上来看,通道只是一个队列,同步模式下,发送方和接收方双方配对,然后直接复制数据给对方。如果配对失败,则置入等待队列,直到另一方出现后才会被唤醒。异步模式抢夺的则是数据缓冲槽,发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。需求不符时,同样加入等待队列,直到有一方写入数据或腾出空槽后被唤醒。

除传递消息(数据)外,通道还常被用作事件通知:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

func main() {
done := make(chan struct{})
c := make(chan string)

go func() {
s := <-c
println(s)
close(done)
}()

c <- "hi"
<-done
}
1
2
$ ./message
hi

同步模式必须有配对操作的 goroutine 出现,否则会一直阻塞,而异步模式在缓冲区未满或数据未读完前,不会阻塞。多数时候异步通道有助于提升性能,减少排队阻塞。缓冲区大小是内部属性,不属于类型组成部分。另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或者 nil。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
"unsafe"
)

func main() {
var a, b chan int = make(chan int, 3), make(chan int)
var c chan bool

println(a == b)
println(c == nil)
fmt.Printf("%p, %d\n", a, unsafe.Sizeof(a))
}
1
2
3
4
$ ./channel_pointer
false
true
0xc0000c2000, 8

内置函数 cap 和 len 返回缓冲区大小和当前已缓冲数量,而对于同步通道都返回 0,据此可以判断通道是同步还是异步。除使用简单的发送和接收操作符外,还可以使用 ok-idiom 或 range 模式处理数据。及时要 close 函数关闭通道引发结束通知,否则可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

func main() {
done := make(chan struct{})
c := make(chan int)

go func() {
defer close(done)

for {
x, ok := <-c
if !ok {
return
}
println(x)
}
}()

c <- 1
c <- 2
c <- 3
close(c)
<-done
}

对于循环接收数据,range 模式更简洁一些,如下代码循环获取消息,直到通道关闭

1
2
3
for x := range c {
println(x)
}

通知可以是群体性的,而且未必就是通知结束,可以是任何需要表达的事件。对一次性的事件用 close() 效率更好,没有多余开销,而连续或多样性的事件,可传递不同数据标志实现。还可以用 sync.Cond 实现单播或广播事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"sync"
"time"
"fmt"
)

func main() {
var wg sync.WaitGroup
ready := make(chan struct{})

for i := 0; i < 3; i++ {
wg.Add(1)

go func(id int) {
defer wg.Done()

println(id, ": ready")
<-ready
println(id, ": runing...")
}(i)
}

time.Sleep(time.Second)
fmt.Println("Read? Go")
close(ready)
wg.Wait()
}
1
2
3
4
5
6
7
8
$ ./ready
0 : ready
1 : ready
2 : ready
Read? Go
0 : runing...
2 : runing...
1 : runing...

对于 closed 或 nil 通道,发送和接收操作都有相应规则:

  • 向已经关闭通道发送数据,会引发 panic
  • 向已经关闭的通道接收数,会先返回已缓冲数据或零值
  • 无论收发,nil 通道都会阻塞
  • 重复关闭,或者关闭 nil 通道都会引发 panic 错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

func main() {
c := make(chan int, 3)

c <- 10
c <- 20
close(c)

for i := 0; i < cap(c)+1; i++ {
x, ok := <-c
println(i, ":", ok, x)
}
}

通道默认是双向的,并不区分发送和接收端。但是某些时候我们可以限制收发操作的方向来获得更加严谨的操作逻辑。尽管可以用 make 创建单向通道,但是没有任何意义,通常使用类型转换来获取单向通道,并分别赋予操作双方。不能在单向通道上做逆向操作,同样 close 不能用于接收端。另外也无法将单向通道转换回去。不能在单向通道上做逆向操作,同样 close 不能用于接收端。另外也无法将单向通道转换回去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(2)

c := make(chan int)
var send chan<- int = c
var recv <-chan int = c

go func() {
defer wg.Done()

for x := range recv {
println(x)
}
}()

go func() {
defer wg.Done()
defer close(c)

for i := 0; i < 3; i++ {
send <- i
}
}()

wg.Wait()
}

如果要同时处理多个通道,可以使用 select 语句,它会随机选择一个可用通道做收发操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(2)

a, b := make(chan int), make(chan int)
go func() {
defer wg.Done()

for {
var (
name string
x int
ok bool
)

select {
case x, ok = <-a:
name = "a"
case x, ok = <-b:
name = "b"
}

if !ok {
return
}
println(name, x)
}
}()

go func() {
defer wg.Done()
defer close(a)
defer close(b)

for i := 0; i < 10; i++ {
select {
case a <- i:
case b <- i * 10:
}
}
}()

wg.Wait()
}

在这个例子中,如果要等待全部通道消息处理结束(closed),可将已完成通道设置为 nil。这样它就会阻塞,不再被 select 选中。

即便是同一通道,也会随机选择 case 执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
"sync"
)

func main() {
var wg sync.WaitGroup
wg.Add(2)

c := make(chan int)

go func() {
defer wg.Done()

for {
var v int
var ok bool

select {
case v, ok = <-c:
println("a1:", v)
case v, ok = <-c:
println("a2:", v)
}

if !ok {
return
}
}
}()

go func() {
defer wg.Done()
defer close(c)

for i := 0; i < 10; i++ {
select {
case c <- i:
case c <- i * 10:
}
}
}()

wg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
$ ./random_case
a2: 0
a2: 1
a1: 20
a2: 3
a2: 40
a1: 50
a1: 60
a2: 70
a2: 8
a2: 90
a2: 0

当所有通道都不可用时,select 会执行 default 语句。如此可以避开 select 阻塞,但是需要注意处理外层循环,以免陷入空耗。可以用 default 处理一些默认逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"time"
)

func main() {
done := make(chan struct{})
c := make(chan int)

go func() {
defer close(done)
for {
select {
case x, ok := <-c:
if !ok {
return
}
fmt.Println("data:", x)
default:
}

fmt.Println(time.Now())
time.Sleep(time.Second)
}
}()

time.Sleep(time.Second * 5)
c <- 100
close(c)
<-done
}
1
2
3
4
5
6
7
8
./select_default
2022-04-30 18:15:56.63143 +0800 CST m=+0.000501161
2022-04-30 18:15:57.632224 +0800 CST m=+1.001320266
2022-04-30 18:15:58.633664 +0800 CST m=+2.002784534
2022-04-30 18:15:59.635089 +0800 CST m=+3.004233906
2022-04-30 18:16:00.637374 +0800 CST m=+4.006543814
data: 100
2022-04-30 18:16:01.637463 +0800 CST m=+5.006656539

通常使用工厂方法将 goroutine 和通道绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import "sync"

type receiver struct {
sync.WaitGroup
data chan int
}

func newReceiver() *receiver {
r := &receiver{
data: make(chan int),
}

r.Add(1)
go func() {
defer r.Done()
for x := range r.data {
println("recv:", x)
}
}()

return r
}

func main() {
r := newReceiver()
r.data <- 1
r.data <- 2

close(r.data)
r.Wait()
}

由于通道本身就是一个并发安全的队列,因此可以用作 ID generator、Pool 等用途,也可以用通道实现信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"runtime"
"sync"
"time"
)

func main() {
runtime.GOMAXPROCS(4)
var wg sync.WaitGroup

sem := make(chan struct{}, 2)

for i := 0; i < 5; i++ {
wg.Add(1)

go func(id int) {
defer wg.Done()

sem <- struct{}{}

defer func() { <-sem }()

time.Sleep(time.Second * 2)
fmt.Println(id, time.Now())
}(i)
}

wg.Wait()
}
1
2
3
4
5
6
$ ./sem
4 2021-10-18 22:48:54.498796 +0800 CST m=+2.000343021
1 2021-10-18 22:48:54.498794 +0800 CST m=+2.000341319
0 2021-10-18 22:48:56.503329 +0800 CST m=+4.004879555
3 2021-10-18 22:48:56.503319 +0800 CST m=+4.004869469
2 2021-10-18 22:48:58.503499 +0800 CST m=+6.005052770

标准库 time 提供了 timeout 和 tick channel 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"os"
"time"
)

func main() {
go func() {
for {
select {
case <-time.After(time.Second * 5):
fmt.Println("timeout...")
os.Exit(0)
}
}
}()

go func() {
tick := time.Tick(time.Second)

for {
select {
case <-tick:
fmt.Println(time.Now())
}
}
}()

// use nil channel to block process
<-(chan struct{})(nil)
}

如下通过捕获 INT、TERM 信号,实现一个简易的 atexit 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"os"
"os/signal"
"sync"
"syscall"
)

var exits = &struct {
sync.RWMutex
funcs []func()
signals chan os.Signal
}{}

func atexit(f func()) {
exits.Lock()
defer exits.Unlock()
exits.funcs = append(exits.funcs, f)
}

func waitExit() {
if exits.signals == nil {
exits.signals = make(chan os.Signal)
signal.Notify(exits.signals, syscall.SIGINT, syscall.SIGTERM)
}

exits.RLock()
for _, f := range exits.funcs {
defer f()
}
exits.RUnlock()

<-exits.signals
}

func main() {
atexit(func() { println("exit1...") })
atexit(func() { println("exit2...") })

waitExit()
}

将发往通道的数据打包,减少传输次数,可以有效提升性能。从实现上来说,通道队列依旧使用锁同步机制,单次获取更多数据(批处理),可改善因频繁加锁造成的性能问题。

通道可能会引发 goroutine leak 问题,它是指 goroutine 处于发送或接收阻塞状态,但是一直没有被唤醒。垃圾回收器并不会回收这类资源,导致它们会在等待队列中长久休眠,形成资源泄露。

同步

通道并非用来取代锁的,它们有各自不同的使用场景。通道倾向于解决逻辑层次的并发处理架构,而锁则用来保护局部范围内的数据安全。标准库 sync 提供了互斥和读写锁、原子操作等,它们的使用并不复杂,但是有几个地方需要注意:

  • 将 Mutex 作为匿名字段时,相关方法必须实现为 point-receiver,否则会因为锁复制机制而失效
  • 应该将 Mutex 锁粒度控制在最小范围内,及早释放
  • Mutex 不支持递归锁,即便在同一 goroutine 下也会导致死锁

相关建议:

  • 对性能要求较高时,应该避免使用 defer Unlock
  • 读写并发时,用 RWMutex 性能会更好一些
  • 对单个数据读写保护,可以尝试使用原子操作
  • 执行严格测试,尽可能打开数据竞争检查