0%

Rust 权威指南(15):无畏并发

安全且高效地处理并发编程是 Rust 的另一个主要目标。借助所有权和类型检查,许多并发问题可以在 Rust 中暴露为编译时错误而不是运行时错误。Rust 的这一特性称为 无畏并发(fearless concurrency),无畏并发让你编写出没有诡异缺陷的代码,并且易于重构而不会引入新的缺陷。

使用线程同时运行代码

现有的编程语言使用了不同的方式来实现线程:

  • 许多操作系统都提供了用于创建新线程的 API,这种直接利用操作系统 API 来创建线程的模型称为 1:1 模型,这意味着一个操作系统线程对应一个语言线程
  • 也有很多编程语言提供了自身特有的线程实现,这种由程序语言提供的线程通常被称为 green thread,使用绿色线程的语言会在不同数量的系统线程环境下运行它们。为此,绿色线程也被称为 M:N 模型,表示 M 个绿色线程对应 N 个系统线程,M 与 N 不必相等

Rust 在设计时考虑到是否需要提供运行时(runtime,这里是指语言中那些被包含在每一个可执行文件中的代码)支持。编程语言总会包含一定数量的运行时代码,Rust 会尽可能地保持几乎没有运行时的状态,这使得我们可以方便地与 C 语言进行交互并获得较高的性能

由于绿色线程 M:N 模型需要一个较大的运行时来管理线程,因此 Rust 标准库只提供了 1:1 线程模型实现,Rust 社区有许多支持 M:N 模型的第三方包。

使用 spawn 创建新的线程

通过调用 thread::spawn 函数来创建线程,它接收一个闭包作为参数,该闭包包含了我们想要在新线程中运行的代码。如下是一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::thread;
use std::time::Duration;


fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("number {} from the spawned thread", i);
thread::sleep(Duration::from_millis(1))

}
});

for i in 1..5 {
println!("number {} from the main thread", i);
thread::sleep(Duration::from_millis(1))

}

}

在这个程序中,线程的执行顺序无法保证,它由操作系统的调度策略决定。而且主线程运行结束后,创建出的新线程也会停止(不管它的打印任务是否完成)。

使用 join 句柄等待所有线程结束

thread::spawn 的返回值类型是一个自持有所有权的 JoinHandle,调用它的 join 方法可以阻塞当前线程直到对应的新线程运行结束。如下通过 join 方法改进上述代码,保证新线程能够在 main 函数退出前执行完毕:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::thread;
use std::time::Duration;


fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("number {} from the spawned thread", i);
thread::sleep(Duration::from_millis(1))

}
});

for i in 1..5 {
println!("number {} from the main thread", i);
thread::sleep(Duration::from_millis(1))

}

handle.join().unwrap()
}

在线程句柄上调用 join 函数会阻塞当前线程,直到句柄代表的线程结束。在并发编程中,诸如在哪里调用 join 等微小的细节也会影响到多个线程是否能够同时运行。

在线程中使用 move 闭包

move 闭包常用来与 thread::spawn 函数配合使用,它允许你在某个线程中使用来自另一个线程的数据。之前介绍过,可以在闭包的参数列表前使用 move 关键字来强制闭包从外部环境中捕获值的所有权。这一技术在我们创建新线程时尤其有用,它可以跨线程地传递某些值的所有权。

在如下代码就无法编译通过,因为 Rust 在推导出如何捕获 v 后决定让闭包借用 v,因为闭包只需要使用 v 的引用。但是这就出现一个问题:Rust 不知道新线程会运行多久,所以它就无法确定 v 的引用是否一直有效。

1
2
3
4
5
6
7
8
9
10
11
use std::thread;

fn main() {
let v = vec![1, 2, 3];

let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});

handle.join().unwrap()
}

通过在闭包前添加 move 关键字,会强制闭包获得它所需值的所有权,而不仅仅是基于 Rust 的推导来获得值的借用。虽然 move 关键字覆盖了 Rust 的默认借用规则,但这并不意味着它会允许我们去违反任何的所有权规则。

使用消息传递在线程间传递数据

使用消息传递机制来保证并发安全正在变得越来越流行。在这种机制中,线程或者 actor 之间通过给彼此发送包含数据的消息来进行通信。Rust 在标准库中实现了一个名为 channel 的编程概念,它可以被用来实现基于消息传递的并发机制。

使用 mpsc::channel 函数创建一个新的通道,mspc 是 multiple producer, single consumer 的缩写。Rust 标准库的实现方式使得通道可以拥有多个生产内容的发送端,但是只能拥有一个消耗内容的接收端。它返回一个包含发送端和接收端的元组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use std::thread;
use std::sync::mpsc;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap()
});

let received = rx.recv().unwrap();
println!("Got {}", received);
}
  • 新线程必须拥有通道发送端的所有权才能通过通道发送消息,所以这里使用了 move 关键字将 tx 移动到了闭包环境中
  • send/recv 方法返回 Result<T, E> 类型的值作为结果
  • recv 会阻塞调用线程直到有值传入通道,而 try_recv 则不会阻塞线程,它会立即返回 Result<T, E>

可以将 rx 视作迭代器,而不再显式地调用 recv 函数,通道关闭时会退出循环。如下是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for v in vals {
tx.send(v).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got {}", received);
}
}

通道和所有权转移

所有权规则在消息传递过程中扮演了至关重要的角色,它可以让我们编写出安全的并发代码。send 函数会获取参数的所有权,并在参数传递时将所有权转移给接收者。这可以阻止我们意外地使用已经发送的值,所有权系统会在编译时确保程序的每个部分都是符合规则的。

通过克隆发送者创建多个生产者

mspc 是 多生产者、单消费者,我们可以通过克隆通道的发送端来创建出多个能够发送值到同一个接收端的线程。如下是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for v in vals {
tx1.send(v).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];

for v in vals {
tx.send(v).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got {}", received);
}
}

共享状态的并发

从某种程度上说,任何编程语言中的通道都有些类似于单一所有权的概念,因为你不应该在传递给通道后再次使用它。而基于共享内存的并发通信机制则更类似于多重所有权的概念:多个线程可以同时访问相同的内存地址。

互斥体一次只允许一个线程访问数据

互斥体(mutex)是 mutual exclusion 的缩写,一个 mutex 在任意时刻只允许一个线程访问数据。为了访问互斥体中的数据,线程必须首先发出信号来获取互斥体的锁。锁是互斥体的一部分,这种数据结构被用来记录当前谁拥有数据的唯一访问权。通过锁机制,互斥体守护了它所持有的数据。

使用互斥体需要牢记以下规则:

  • 必须在使用数据前尝试获取锁
  • 必须在使用完互斥体守护的数据后释放锁

在 Rust 中由于类型系统和所有权规则的帮助,可以保证自己不会在加锁和解锁两个步骤中出现错误。如下是一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
let mut num = m.lock().unwrap();
*num = 6;
}

println!("m = {:?}", m);
}
  • 使用 new 创建 Mutex 实例,为了访问 Mutex 实例中的数据,需要首先调用它的 lock 方法来获取锁
  • 对 lock 的调用可能会失败,所以这里使用 unwrap 对结果进行处理
  • 一旦获取锁,就可以将它的返回值 num 视作一个指向内部数据的可变引用。Rust 的类型系统可以确保我们在使用 m 的值之前执行加锁操作,因为 Mutex<i32> 并不是 i32 的类型,必须加锁才能使用 i32 的值

对 lock 的调用会返回一个 MutexGuard 的智能指针,该智能指针通过实现 Deref 来指向存储在内部的数据。它还会通过实现 Drop 来完成自己离开作用域时的自动解锁操作。

如下是另一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}
  • 需要注意,我们不能将 counter 的所有权移动到多个线程中
  • Rc<T> 在跨线程使用时并不安全,因为它并没有使用任何并发原语来保证修改计数的过程中不会被另一个线程打断。所以在并发场景下不能使用 Rc<T> 来实现多重所有权
  • Arc<T> 类似于 Rc<T>,但是可以保证自己可以被安全地应用于并发场景。A 代表 Atomic,表明自己是一个原子引用计数类型
  • 我们需要付出一定的性能开销才能实现线程安全,所以应该只在必要时为这种开销买单

在这个例子中,虽然 counter 本身不可变,但是我们仍然可以获取其内部值的可变引用。之前我们使用过 RefCell<T> 来修改 Rc<T> 中的内容,这里我们按照同样的方式,使用 Mutex<T> 来改变 Arc<T> 中的内容。

Mutex<T> 也会产生死锁的风险,当某个操作需要同时锁住两个资源,而两个线程分别持有其中一个锁并相互请求另外一个锁时,这两个线程就会死锁。

使用 Sync trait 和 Send trait 对并发进行扩展

Rust 语言内置的并发特性非常少,上面介绍过的并发特性几乎都是标准库的一部分,而非语言内置的。Rust 语言也内置了两个并发概念,即 std::marker 模块内的 Sync traitSend trait

允许线程之间转移所有权的 Send trait

只有实现 Send trait 的类型才可以安全地在线程之间转移所有权。除了 Rc<T> 等极少数类型,几乎所有 Rust 类型都实现了 Send trait。Rust 的类型系统与 trait 约束能够阻止我们意外地跨线程传递 Rc<T> 实例。

任何完全由 Send 类型组成的复合类型都会被自动标记为 Send。除了裸指针(后续介绍),几乎所有原生类型都满足 Send 约束。

允许多线程同时访问的 Sync trait

只有实现了 Sync trait 的类型才可以安全地被多个线程引用。对于任何类型,如果 &T 满足约束 Send,那么 T 就是满足 Sync 的。这意味着 T 的引用可以被安全地传递到另外的现成。与 Send 类似,所有原生类型都满足 Sync 约束,而完全满足 Sync 的类型组成的复合类型也都会自动识别为满足 Sync 的类型。

手动实现 Send 和 Sync 是不安全的

当某个类型完全由实现了 Send 与 Sync 的类型组成时,它就会自动实现 Send 与 Sync。因此,我们并不需要手动地为此种类型实现相关 trait。作为标签 trait,Send 与 Sync 甚至没有任何可供实现的方法。它们仅仅被用来强化与并发相关的不可变性。

手动实现这些 trait 涉及使用特殊的、不安全的 Rust 代码。当你构建的自定义并发类型包含了没有实现 Send 或者 Sync 类型时,必须非常谨慎地确保设计能够满足线程间的安全性要求。