3 并发编程

不同的编程语言使用不同的线程模型,rust标准库使用1:1线程实现,这代表程序的每一个语言级线程使用一个系统线程。在线程间通信方面,rust提供了不同程度抽象的工具,比如通道、互斥锁和原子类型,我们会在后面分别介绍它们。

3.1 多线程同时运行代码

为了创建一个新线程,需要调用 thread::spawn 函数并传递一个闭包,并在其中包含希望在新线程运行的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::thread;
use std::time::Duration;

fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
// 强制线程停止执行一小段时间(1ms)
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from the main thread!", i);
// 强制线程停止执行一小段时间(1ms)
thread::sleep(Duration::from_millis(1));
}
}

注意当rust程序的主线程结束时,新线程也会结束,而不管其是否执行完毕。这个程序的输出可能每次都略有不同,不过它大体上看起来像这样:

1
2
3
4
5
6
7
8
9
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!

调用thread::sleep 会强制线程停止执行一小段时间,这会允许其它不同的线程运行。这些线程可能会轮流运行,不过并不保证如此:这依赖操作系统如何调度线程。在这里,主线程首先打印,即便新创建线程的打印语句位于程序的开头,甚至即便我们告诉新建的线程打印直到 i=9,它在主线程结束之前也只打印到了5。

由于主线程结束,新线程中的代码大部分时候会提早结束,在一些情况下下,由于无法保证线程运行的顺序,我们甚至不能实际保证新建线程会被执行。

使用join等待所有线程结束

可以通过将 thread::spawn 的返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题。thread::spawn的返回值类型是 JoinHandleJoinHandle 是一个拥有所有权的值,当对其调用 join 方法时,它会等待其线程结束。将上一节的程序改造一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}

handle.join().unwrap();
}

这里获取到创建的线程的 JoinHandle 类型的变量handle,并调用 join 方法来确保新建线程在 main 退出前结束运行。通过调用handlejoin 会阻塞当前线程直到handle所代表的线程结束。因为我们将 join 调用放在了主线程的 for 循环之后,因此这段代码应该会产生这样的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

这两个线程仍然会交替执行,不过主线程会由于 handle.join() 调用,会等待直到新建线程执行完毕。由于大部分情况下两个线程会交替执行,使用join可以防止主线程先于其它线程结束。如果调整join方法调用的位置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::thread;
use std::time::Duration;

fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});

handle.join().unwrap();

for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
/*
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
*/

主线程会等待直到新建线程执行完毕之后,才开始执行 for 循环,所以输出将不会交替出现。因此,在使用时,需要注意join使用的位置。

join会获取线程闭包的返回值,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::thread;
use std::time::{Duration, Instant};

fn main() {
let mut handles = vec![];
for i in 0..10 {
handles.push(thread::spawn(move || {
let start = Instant::now();
thread::sleep(Duration::from_millis(250));
println!("thread {} is complete", i);
start.elapsed().as_millis()
}));
}

let mut results: Vec<u128> = vec![];
for handle in handles {
results.push(handle.join().unwrap());
}

if results.len() != 10 {
panic!("Oh no! All the spawned threads did not finish!");
}

println!();
for (i, result) in results.into_iter().enumerate() {
println!("thread {} took {}ms", i, result);
}
}

线程结束时会返回所耗费的时间start.elapsed().as_millis(),使用handle.join会获得一个Result<T>,这里的T就是我们所返回的类型。

线程与move闭包

在介绍闭包的获取所有权小节,我们已经介绍了一个使用move的例子:

1
2
3
4
5
6
7
8
9
10
use std::thread;

fn main() {
let list = vec![1, 2, 3];
println!("Before defining closure: {:?}", list);

thread::spawn(move || println!("From thread: {:?}", list))
.join()
.unwrap();
}

先来看看如果去掉move会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use std::thread;

fn main() {
let list = vec![1, 2, 3];
println!("Before defining closure: {:?}", list);

thread::spawn(|| println!("From thread: {:?}", list))
.join()
.unwrap();
}
/*
error[E0373]: closure may outlive the current function, but it borrows `list`, which is owned by the current function
--> src\main.rs:7:19
|
7 | thread::spawn(|| println!("From thread: {:?}", list))
| ^^ ---- `list` is borrowed here
| |
| may outlive borrowed value `list`
|
note: function requires argument type to outlive `'static`
--> src\main.rs:7:5
|
7 | thread::spawn(|| println!("From thread: {:?}", list))
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: to force the closure to take ownership of `list` (and any other referenced variables), use the `move` keyword
|
7 | thread::spawn(move || println!("From thread: {:?}", list))
| ++++
*/

这段代码本身没有问题,闭包内仅仅打印了动态数组list,这只需要它的不可变引用。但是rust无法确定新的线程会执行多久——这是由操作系统决定的——所以也无法知晓 list 的引用是否一直有效。换句话说,存在一种可能,在新线程执行过程中,主线程的list已经失效(被移动或被drop清理),此时list的引用已经无效了。当然,这段代码也可能正常运行,但rust编译器是保守的,当可能出现问题,也可能不出现问题时,rust选择报错,拒绝编译。

下面就是一个可能会出现问题的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
use std::thread;

fn main() {
let v = vec![1, 2, 3];

let handle = thread::spawn(|| {
println!("Here's a vector: {:?}", v);
});

drop(v); // oh no!

handle.join().unwrap();
}

如果这段代码能够通过编译,则新建线程则可能会立刻被转移到后台并完全没有机会运行。新建线程内部有一个 v 的引用,不过主线程立刻就使用 drop 丢弃了 v(实际上是转移了所有权),接着当新建线程开始执行,v 已不再有效,所以其引用也是无效的。

通过在闭包之前增加 move 关键字,我们强制闭包获取其使用的值的所有权,而不是任由rust推断它应该借用值。

嵌套线程的执行与结束

如果尝试在子线程中嵌套创建新线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个线程A
let new_thread = thread::spawn(move || {
// 再创建一个线程B
thread::spawn(move || {
loop {
println!("I am a new thread.");
}
})
});

// 等待新创建的线程执行完成
new_thread.join().unwrap();
println!("Child thread is finish!");

// 睡眠一段时间,看子线程创建的子线程是否还在运行
thread::sleep(Duration::from_millis(100));
}

以上代码中,main 线程创建了一个新的线程 A,同时该新线程又创建了一个新的线程 BA 线程在创建完 B 线程后就立即结束了,而 B 线程则在不停地循环输出。这说明,创建出的B线程是独立运行的,出于安全性的考虑,rust没有提供直接杀死线程的接口,线程B需要等到主线程结束后自动结束。

线程屏障(Barrier)

在rust中,可以使用 Barrier 让多个线程都执行到某个点后,才继续一起往后执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
let mut handles = Vec::with_capacity(4);
let barrier = Arc::new(Barrier::new(4));

for _ in 0..4 {
let b = barrier.clone();
handles.push(thread::spawn(move|| {
println!("before wait");
b.wait();
println!("after wait");
}));
}

for handle in handles {
handle.join().unwrap();
}
}
/* 输出结果:
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait
*/

上面代码,我们在线程打印出 before wait 后增加了一个屏障,目的就是等所有的线程都打印出before wait后,各个线程再继续执行。

线程局部变量(Thread Local Variable)

rust提供了标准库和第三方库支持线程局部变量。

使用 thread_local 宏可以初始化线程局部变量,然后在线程内部使用该变量的 with 方法获取变量值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use std::cell::RefCell;
use std::thread;

fn main() {
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));

FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 2;
});

// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move|| {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 3;
});
});

// 等待线程完成
t.join().unwrap();

// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
assert_eq!(*f.borrow(), 2);
});
}

上面代码中,FOO 即是我们创建的线程局部变量,每个新的线程访问它时,都会使用它的初始值作为开始,各个线程中的 FOO 值彼此互不干扰。注意 FOO 使用 static 声明为生命周期为 'static 的静态变量。

另外线程中对 FOO 的使用是通过借用的方式,但是若我们需要每个线程独自获取它的拷贝,最后进行汇总,就有些强人所难。

你还可以在结构体中使用线程局部变量:

1
2
3
4
5
6
7
8
9
10
11
12
use std::cell::RefCell;

struct Foo;
impl Foo {
thread_local! {
static FOO: RefCell<usize> = RefCell::new(0);
}
}

fn main() {
Foo::FOO.with(|x| println!("{:?}", x));
}

或者通过引用的方式使用它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use std::cell::RefCell;
use std::thread::LocalKey;

thread_local! {
static FOO: RefCell<usize> = RefCell::new(0);
}
struct Bar {
foo: &'static LocalKey<RefCell<usize>>,
}
impl Bar {
fn constructor() -> Self {
Self {
foo: &FOO,
}
}
}

使用第三方库 thread-local ,可以解决只能使用引用的问题,每个线程持有值的独立拷贝。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use thread_local::ThreadLocal;
use std::sync::Arc;
use std::cell::Cell;
use std::thread;

fn main() {
let tls = Arc::new(ThreadLocal::new());

// 创建多个线程
for _ in 0..5 {
let tls2 = tls.clone();
thread::spawn(move || {
// 将计数器加1
let cell = tls2.get_or(|| Cell::new(0));
cell.set(cell.get() + 1);
}).join().unwrap();
}

// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
let tls = Arc::try_unwrap(tls).unwrap();
let total = tls.into_iter().fold(0, |x, y| x + y.get());

// 和为5
assert_eq!(total, 5);
}

该库不仅仅使用了值的拷贝,而且还能自动把多个拷贝汇总到一个迭代器中,最后进行求和。

用条件控制线程的挂起和执行

std::sync::Condvar 类型实现了条件变量,它有 wait方法 和 notify_all方法,wait方法会阻塞到直到有其他线程调用 notify_all 方法或者 notify_one方法。当等待的条件到来时,可以使用notify_all 方法或者 notify_one方法通知其他线程,为了进入休眠等待条件变为 true,使用一个while循环加上wait方法是标准习惯用法。但是wait 方法会按值获取 MutexGuard 对象,使用它,并在成功时返回一个新的 MutexGuard,关于MutexGuard,它是一个智能指针,后面会介绍到。详见互斥锁Mutex

条件变量(Condition Variables)经常和 Mutex 一起使用,可以让线程挂起,直到某个条件发生后再继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::thread;
use std::sync::{Arc, Mutex, Condvar};

fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
println!("changing started");
*started = true;
cvar.notify_one();
});

let &(ref lock, ref cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}

println!("started changed");
}

上述代码流程如下:

  1. main 线程首先进入 while 循环,调用 wait 方法挂起等待子线程的通知,并释放了锁 started
  2. 子线程获取到锁,并将其修改为 true,然后调用条件变量的 notify_one 方法来通知主线程继续执行

确保函数只被调用一次

有时,我们会需要某个函数在多线程环境下只被调用一次,例如初始化全局变量,无论是哪个线程先调用函数来初始化,都会保证全局变量只会被初始化一次,随后的其它线程调用就会忽略该函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::thread;
use std::sync::Once;

static mut VAL: usize = 0;
static INIT: Once = Once::new();

fn main() {
let handle1 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 1;
}
});
});

let handle2 = thread::spawn(move || {
INIT.call_once(|| {
unsafe {
VAL = 2;
}
});
});

handle1.join().unwrap();
handle2.join().unwrap();

println!("{}", unsafe { VAL });
}

代码运行的结果取决于哪个线程先调用 INIT.call_once (虽然代码具有先后顺序,但是线程的初始化顺序并无法被保证!因为线程初始化是异步的,且耗时较久),若 handle1 先,则输出 1,否则输出 2

call_once方法让执行初始化过程一次,并且只执行一次。如果当前有另一个初始化过程正在运行,线程将阻止该方法被调用。当这个函数返回时,保证一些初始化已经运行并完成,它还保证由执行的闭包所执行的任何内存写入都能被其他线程在这时可靠地观察到。

3.2 使用消息传递在线程间通信

消息传递(message passing)是一个确保安全并发的方式。线程通过发送包含数据的消息来相互沟通,这个思想来源于 Go 编程语言文档 中的口号:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通讯;而是通过通讯来共享内存。

为了实现消息传递并发,rust标准库提供了一个通道(channel)。通道是一个通用编程概念,它是可以数据从一个线程发送到另一个线程单向管道。换句话说,通道是一个线程安全的队列。

使用通道发送和接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});

println!("receive {} in main thread.", rx.recv().unwrap());
}

这里使用 mpsc::channel 函数创建一个新的信道;mpsc 是多个生产者,单个消费者(multiple producer, single consumer)的缩写。简而言之,rust标准库实现信道的方式意味着一个信道可以有多个产生值的发送(sending)端,但只能有一个消费这些值的接收(receiving)端。

mpsc::channel 函数返回一个元组:第一个元素是发送端 – 发送者,而第二个元素是接收端 – 接收者。由于历史原因,txrx 通常作为发送者(transmitter)和接收者(receiver)的缩写,所以这就是我们将用来绑定这两端变量的名字。这里使用了一个 let 语句和模式来解构了此元组。

这里txrx的类型由编译器自动推导:tx.send("hi")发送了String,因此它们分别是mpsc::Sender<String>mpsc::Receiver<String>类型,由于内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值。

使用 thread::spawn 来创建一个新线程并使用 movetx 移动到闭包中,这样新建线程就拥有 tx 了。新建线程需要拥有信道的发送端以便能向信道发送消息。信道的发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,发送操作会返回错误,这里调用 unwrap 在出错的时候产生panic。

主线程使用rx作为通道的接收者,有两个方法:recvtry_recv。这里,我们使用了 recv,它是 receive 的缩写。这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。当信道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。同样的,对于recv方法来说,当发送者被丢弃时,也会接收到一个错误,这里同样使用unwrap 在出错的时快速处理。

try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。

出于简单的考虑,这个例子使用了 recv。主线程中除了等待消息之外没有任何其他工作,所以阻塞主线程是合适的。

最终代码会输出如下结果:

1
receive hi in main thread.

通道与所有权转移

所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。现在让我们做一个试验来看看信道与所有权如何一同协作以避免产生问题:我们将尝试在新建线程中的信道中发送完 val 值之后再使用它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val); // borrow of moved value: `val`
});

let received = rx.recv().unwrap();
println!("Got: {}", received);
}
/*
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {}", val);
| ^^^ value borrowed here after move
*/

这里尝试在通过 tx.send 发送 val 到信道中之后将其打印出来。显然这会造成问题,一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。rust拒绝编译,说明使用通道来传输数据时仍然需要遵循所有权规则,这可以防止在发送后再次意外地使用这个值。

发送多个值并让接收者循环等待

这次,尝试发送多个值,观察接收者的等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

这一次,在新建线程中有一个字符串vector希望发送到主线程。我们遍历它们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒。

在主线程中,不再显式调用 recv 函数,rx 是一个迭代器,可以使用for 遍历它:

1
2
3
for received in rx {
println!("Got: {}", received);
}

还可以使用如下方式:

1
2
3
while let Ok(received) = rx.recv() {
println!("Got: {}", received);
}

这两种方式是等价的,无论是用哪种,对于每一个接收到的值,我们将其打印出来。如果到达循环顶部时通道恰好是空的,则接收线程将阻塞,直到其他线程发送一个值。当通道为空且发送者已被丢弃时,循环将正常退出。

运行整个代码,将得到如下输出,每输出一行都会暂停一秒:

1
2
3
4
Got: hi
Got: from
Got: the
Got: thread

通过克隆发送者来创建多个生产者

之前提到了mpsc是多生产者单消费者的缩写,可以运用 mpsc来创建向同一接收者发送值的多个线程。这可以通过克隆发送者来做到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

for received in rx {
println!("Got: {}", received);
}
}

这一次,在创建新线程之前,我们对发送者调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的通道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。

如果运行这些代码,你可能会看到这样的输出:

1
2
3
4
5
6
7
8
Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: thread
Got: you

你可能会看到这些值以不同的顺序出现,这依赖于你的系统。这也是并发既有趣又困难的原因。如果通过 thread::sleep 做实验,在不同的线程中提供不同的值,就会发现他们的运行更加不确定,且每次都会产生不同的输出。

不过,同一通道的消息是有顺序的,因为它本质上是队列。

同步通道

前面通过mpsc::channel函数创建的通道是异步的,发送一条消息,即使消息没有被接收,异步通道也不会阻塞。

rust还有另外一种同步通道mpsc::sync_channel,同步通道与异步通道基本相同,有一点区别:异步通道的内部缓冲区(Buffer)是无限的,在创建时无需指定大小;而同步通道在创建时需要指定内部缓冲区(Buffer)的大小。

使用通道收发消息的程序可能会遇到这样的情况:发送值的速度超过了接收和处理的速度。这会导致越来越多的消息在内部缓冲区中累积。更糟糕的是,发送线程继续运行,占用 CPU 和其他系统资源来发送更多的值,而这些资源正好在接收端最需要这些资源。

更准确地描述就是:在数据流从上游生产者向下游消费者传输的过程中,上游生产速度大于下游消费速度,导致下游的 Buffer溢出,这种现象叫做背压(Backpressure)。

借鉴了Unix系统中管道的处理方法,Unix 系统上的每个管道也有固定的大小,如果一个进程试图写入一个暂时已满的管道,系统会阻塞该进程直到管道中有空间。rust也是如此:当内部缓冲区满时,同步通道将阻塞等待缓冲区打开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx)= mpsc::sync_channel(0);

let handle = thread::spawn(move || {
println!("before sending");
tx.send(1).unwrap();
println!("after sending");
});

println!("main thread sleeping...");
thread::sleep(Duration::from_secs(3));

println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}

这里使用mpsc::sync_channel(0)创建了一个内部缓冲区为0的通道,这是有效的,如果将缓冲区设为0,它将成为会合通道(rendezvous channel),其中的每个消息均会阻塞,直到recv接收它。

上面的代码就演示了这个过程,它的输出结果如下:

1
2
3
4
main thread sleeping...
before sending
receive 1
after sending

新创建的线程发送了一条消息,由于我们在主线程睡眠了3秒,主线程没有接收这个值,因此通道阻塞。直到rx.recv方法执行将值接收后,阻塞被解除。

需要注意的是,同步通道并不总是阻塞的,只有Buffer满后才会阻塞。比如,使用mpsc::sync_channel(10)代表缓冲区大小为10,只有发送消息时缓冲区已经有10条数据的情况下,新的消息才会阻塞。

无限的缓存空间

上一节提到了,异步通道的内部缓冲区(Buffer)是无限的(infinite buffer),rust是如何实现的?

类似于Vec<T>是无限大小的一样,这里的无限不是真正意义上的“无限”,你可以把元素添加到动态数组中,但是当你的物理设备资源耗尽时,无法分配足够的堆内存,此时程序可能会崩溃。通道也是同理,异步通道虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。

传输多种类型的数据

一个消息通道只能传输一种类型的数据,如果想要传输多种类型的数据,可以为每个类型创建一个通道,也可以使用枚举类型来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::sync::mpsc::{self, Receiver, Sender};

enum Fruit {
Apple(u8),
Orange(String)
}

fn main() {
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();

tx.send(Fruit::Orange("sweet".to_string())).unwrap();
tx.send(Fruit::Apple(2)).unwrap();

for _ in 0..2 {
match rx.recv().unwrap() {
Fruit::Apple(count) => println!("received {} apples", count),
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
}
}
}

如上所示,枚举类型还能让我们带上想要传输的数据,但是有一点需要注意,rust按照枚举中占用内存最大的那个成员进行内存对齐,这意味着就算传输的是枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同,可能会造成内存上的浪费。

使用mpmc的第三方库

如果你需要mpmc或者需要更高的性能,可以考虑第三方库:

  • crossbeam-channel,老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了crossbeam 主仓库中
  • flume,官方给出的性能数据某些场景要比 crossbeam 更好些

3.3 共享状态并发

虽然消息传递是一个很好的处理并发的方式,但并不是唯一一个。另一种方式是让多个线程拥有相同的共享数据。

在某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将一个值传送到通道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。智能指针可以使多所有权成为可能,然而这会增加额外的复杂性,因为需要以某种方式管理这些不同的所有者。rust的类型系统和所有权规则极大的协助了正确地管理这些所有权。作为一个例子,让我们看看互斥器,一个更为常见的共享内存并发原语。

互斥锁Mutex

互斥锁(mutex)是 mutual exclusion 的缩写,也就是说,任意时刻,其只允许一个线程访问某些数据。在其他语言中,互斥锁的使用需要注意两点:

  • 在需要互斥访问之前(进入临界区)获得锁
  • 处理完毕数据之后(离开临界区),释放锁

使用互斥锁比较复杂,这也是许多人热衷于通道的原因。然而,在rust中,得益于类型系统和所有权,我们不会在锁和解锁上出错。

出于简单的考虑,我们从在单线程上下文使用互斥锁Mutex开始:

1
2
3
4
5
6
7
8
9
10
11
12
use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);

{
let mut num = m.lock().unwrap();
*num = 6;
}

println!("m = {:?}", m);
}

像很多类型一样,我们使用关联函数 new 来创建一个 Mutex<T>,和Box<T>类似,数据被Mutex<T>所拥有,要访问内部的数据,需要使用方法m.lock()m申请一个锁,这个调用会阻塞当前线程,直到我们拥有锁为止。如果另一个线程拥有锁,并且那个线程panic了,则 lock 调用会失败。在这种情况下,没人能够再获取锁,所以这里选择 unwrap 并在遇到这种情况时使线程panic。当多个线程同时访问该数据时,只有一个线程能获取到锁,其它线程只能阻塞等待,这样就保证了数据能被安全的修改。

一旦获取了锁,就可以将返回值(在这里是num)视为一个其内部数据的可变引用。类型系统确保了我们在使用 m 中的值之前获取锁。m 的类型是 Mutex<i32> 而不是 i32,所以必须获取锁才能使用这个 i32 值。我们是不会忘记这么做的,因为反之类型系统不允许访问内部的 i32 值。

Mutex<T> 是一个智能指针,更准确的说,调用 lock 方法返回一个叫做 MutexGuard<T> 的智能指针。这个智能指针:

  • 实现了Deref特征,会被自动解引用后获得一个引用类型,该引用指向Mutex内部的数据
  • 实现了Drop特征,在离开作用域时自动释放锁,以便其它线程能继续获取锁,为此,我们不会忘记释放锁并阻塞其它线程的风险,因为锁的释放是自动发生的。正是由于此,你需要做的仅仅是做好锁的作用域管理。

丢弃了锁之后,可以打印出互斥锁的值,并发现能够将其内部的 i32 改为6

多线程中使用互斥锁

上一节的例子中仅仅演示了单线程下的互斥锁,现在来尝试使用 Mutex<T> 在多个线程间共享值。我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::sync::Mutex;
use std::thread;

fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];

for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

这里创建了一个 counter 变量来存放内含 i32Mutex<T>,接下来遍历 range 创建了 10 个线程。使用了 thread::spawn 并对所有线程使用了相同的闭包:它们每一个都将调用 lock 方法来获取 Mutex<T> 上的锁,接着将互斥锁中的值加一。当一个线程结束执行,num 会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。

在主线程中,收集了所有线程的JoinHandle,调用 join 方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果。

然而这段代码会报错:

1
2
3
4
5
6
7
8
   |
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
10 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure

错误信息表明 counter 值在上一次循环中被移动了。rust告诉我们不能将 counter 锁的所有权移动到多个线程中。

多线程和多所有权

要想让多个所有者拥有值,在前面Rc<T>引用计数智能指针章节中,介绍过使用智能指针 Rc<T> 来创建引用计数,以便拥有多所有者。现在来尝试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

这次将 Mutex<T> 封装进 Rc<T> 中并在将所有权移入线程之前克隆了 Rc<T>。但是rust仍然报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src\main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `[closure@src\main.rs:11:36: 11:43]`
| | |
| | required by a bound introduced by this call
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `[closure@src\main.rs:11:36: 11:43]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
--> src\main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> C:\Users\aaa\.rustup\toolchains\stable-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\std\src\thread\mod.rs:704:8
|
704 | F: Send + 'static,
| ^^^^ required by this bound in `spawn`

报错中出现了一些提示,第一行错误:

1
`Rc<Mutex<i32>>` cannot be sent between threads safely

提示Rc<T>不能安全地在线程之间发送。然后是:

1
the trait `Send` is not implemented for `Rc<Mutex<i32>>`

提示Rc<T>类型没有实现Send特征。

看来,Rc<T> 并不能安全的在线程间共享。当 Rc<T> 管理引用计数时,它必须在每一个 clone 调用时增加计数,并在每一个克隆被丢弃时减少计数。Rc<T> 并没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断。在计数出错时可能会导致诡异的 bug,比如可能会造成内存泄漏,或在使用结束之前就丢弃一个值。我们所需要的是一个完全类似 Rc<T>,又以一种线程安全的方式改变引用计数的类型。

线程安全的引用计数Arc<T>

Arc<T>正是一个类似 Rc<T> 保证了并发安全的引用计数。字母A代表原子性(atomic),所以这是一个原子引用计数(atomically reference counted)类型。原子类型提供线程之间的基本共享内存通信,可以在标准库中查看更详细的定义:std::sync::atomic,但现在我们只需要知道原子类型就像基本类型一样可以安全的在线程间共享。

Arc<T>Rc<T> 有着相同的 API,所以修改程序中的 use 行和 new 调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();

*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

代码最终可以正常运行,这会打印出:

1
Result: 10

你可能会好奇为什么不是所有的原始类型都是原子性的,为什么不是所有标准库中的类型都默认使用 Arc<T> 实现。原因在于线程安全带有性能惩罚,rust希望只在必要时才为此买单。如果只是在单线程中对值进行操作,原子性提供的保证并无必要,代码可以减少这部分性能损失,以运行的更快。

内部可变性

在之前组合使用Rc<T>RefCell<T>来拥有多个可变数据所有者小节中,介绍了Rc<T>RefCell<T>的结合,可以实现单线程中的内部可变性。

现在,我们介绍了它们线程安全的版本,即:组合使用 Mutex<T>Arc<T> 在多线程中实现内部可变性。

死锁

在rust中有多种方式可能造成死锁,了解这些方式有助于你提前规避可能的风险:

首先是单线程死锁,比较好避免:

1
2
3
4
5
6
7
use std::sync::Mutex;

fn main() {
let data = Mutex::new(0);
let d1 = data.lock();
let d2 = data.lock();
} // d1锁在此处释放

只要你在另一个锁还未被释放时去申请新的锁,线程就会被阻塞,当代码复杂后,这种情况可能就没有那么显眼。

多线程死锁就稍微复杂一些,当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
use std::{sync::{Mutex, MutexGuard}, thread};
use std::thread::sleep;
use std::time::Duration;

// 第三方库
use lazy_static::lazy_static;
// lazy_static! {} 中的代码并不会在编译时初始化静态量,它会在首次调用时,执行代码,来初始化。也就是所谓的延迟计算。
lazy_static! {
static ref MUTEX1: Mutex<i64> = Mutex::new(0);
static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}

fn main() {
// 存放子线程的句柄
let mut children = vec![];
for i_thread in 0..2 {
children.push(thread::spawn(move || {
for _ in 0..1 {
// 线程1
if i_thread % 2 == 0 {
// 锁住MUTEX1
let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();

println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);

// 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
sleep(Duration::from_millis(10));

// 去锁MUTEX2
let guard = MUTEX2.lock().unwrap();
// 线程2
} else {
// 锁住MUTEX2
let _guard = MUTEX2.lock().unwrap();

println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);

let _guard = MUTEX1.lock().unwrap();
}
}
}));
}

// 等子线程完成
for child in children {
let _ = child.join();
}

println!("死锁没有发生");
}

在上面的描述中,我们用了“可能“二字,原因在于死锁在这段代码中不是必然发生的,总有一次运行你能看到最后一行打印输出。这是由于子线程的初始化顺序和执行速度并不确定,我们无法确定哪个线程中的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。

但是,可以简单的说明下死锁发生的必然条件:线程 1 锁住了MUTEX1并且线程2锁住了MUTEX2,然后线程 1 试图去访问MUTEX2,同时线程2试图去访问MUTEX1,就会死锁。 因为线程 2 需要等待线程 1 释放MUTEX1后,才会释放MUTEX2,而与此同时,线程 1 需要等待线程 2 释放MUTEX2后才能释放MUTEX1,这种情况造成了两个线程都无法释放对方需要的锁,最终死锁。

但有些情况下不会发生死锁:线程 2 在线程 1 锁MUTEX1之前,就已经全部执行完了,随之线程 2 的MUTEX2MUTEX1被全部释放,线程 1 对锁的获取将不再有竞争者。 同理,线程 1 若全部被执行完,那线程 2 也不会被锁,因此我们在线程 1 中间加一个睡眠,增加死锁发生的概率。如果你在线程 2 中同样的位置也增加一个睡眠,那死锁将必然发生。

在计算机专业课《操作系统》中对这种死锁的产生和预防都有比较详细的讲解,感兴趣的读者可以自行搜搜看。

另外,与lock方法不同,还有一个try_lock方法,这个方法会尝试获取一次锁,如果无法获取会返回一个错误,因此不会发生阻塞。当try_lock失败时,会报出一个错误:Err("WouldBlock"),接着线程中的剩余代码会继续执行,不会被阻塞。

读写锁 RwLock

Mutex会对每次读写都进行加锁,但某些时候,我们需要大量的并发读,Mutex就无法满足需求了,此时就可以使用RwLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::sync::RwLock;

fn main() {
let lock = RwLock::new(5);

// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop

// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);

// 以下代码会panic,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
}// 写锁在此处被drop
}

RwLock在使用上和Mutex区别不大,需要注意的是,当读写同时发生时,程序会直接panic,因为会发生死锁。我们可以使用try_writetry_read来尝试进行一次写/读,若失败则返回错误Err("WouldBlock")

总结下RwLock:

  1. 同时允许多个读,但最多只能有一个写
  2. 读和写不能同时存在
  3. 读可以使用readtry_read,写使用writetry_write。在实际项目中,try_xxx会安全的多

Mutex还是RwLock

首先简单性上Mutex完胜,因为使用RwLock要操心几个问题:

  • 读和写不能同时发生,如果使用try_xxx解决,就必须做大量的错误处理和失败重试机制
  • 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败:见writer starvation
  • RwLock 其实是操作系统提供的,实现原理要比Mutex复杂的多,因此单就锁的性能而言,比不上原生实现的Mutex

再来简单总结下两者的使用场景:

  • 追求高并发读取时,使用RwLock,因为Mutex一次只允许一个线程去读取
  • 如果要保证写操作的成功性,使用Mutex
  • 不知道哪个合适,统一使用Mutex

需要注意的是,RwLock虽然看上去貌似提供了高并发读取的能力,但这个不能说明它的性能比Mutex高,事实上Mutex性能要好不少,后者唯一的问题也仅仅在于不能并发读取。

一个常见的、错误的使用RwLock的场景就是使用HashMap进行简单读写,因为HashMap的读和写都非常快,RwLock的复杂实现和相对低的性能反而会导致整体性能的降低,因此一般来说更适合使用Mutex

总之,如果你要使用RwLock要确保满足以下两个条件:并发读,且需要对读到的资源进行“长时间“的操作,HashMap也许满足了并发读的需求,但是往往并不能满足后者:“长时间“的操作。

用条件变量Condvar控制线程的同步

Mutex用于解决资源安全访问的问题,但是我们还需要一个手段来解决资源访问顺序的问题。rust考虑到了这一点,为我们提供了条件变量(Condition Variables),它经常和Mutex一起使用,可以让线程挂起,直到某个条件发生后再继续执行,其实Condvar我们在之前的用条件控制线程的挂起和执行小节已经介绍过,现在再来看一个不同的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use std::sync::{Arc,Mutex,Condvar};
use std::thread::{spawn,sleep};
use std::time::Duration;

fn main() {
let flag = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
let cflag = flag.clone();
let ccond = cond.clone();

let hdl = spawn(move || {
let mut m = { *cflag.lock().unwrap() };
let mut counter = 0;

while counter < 3 {
while !m {
m = *ccond.wait(cflag.lock().unwrap()).unwrap();
}

{
m = false;
*cflag.lock().unwrap() = false;
}

counter += 1;
println!("inner counter: {}", counter);
}
});

let mut counter = 0;
loop {
sleep(Duration::from_millis(1000));
*flag.lock().unwrap() = true;
counter += 1;
if counter > 3 {
break;
}
println!("outside counter: {}", counter);
cond.notify_one();
}
hdl.join().unwrap();
println!("{:?}", flag);
}

通过主线程来触发子线程实现交替打印输出:

1
2
3
4
5
6
7
outside counter: 1
inner counter: 1
outside counter: 2
inner counter: 2
outside counter: 3
inner counter: 3
Mutex { data: true, poisoned: false, .. }

信号量 Semaphore

信号量的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明的,在很多书中将信号量称为PV操作或PV原语。V操作会增加信号标S的数值,P操作会减少它。在rust中,使用信号量可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

rust在标准库中有提供一个信号量实现,但是已经被弃用了。因此这里推荐使用tokio中提供的Semaphore实现:tokio::sync::Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::sync::Arc;
use tokio::sync::Semaphore;

#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut join_handles = Vec::new();

for _ in 0..5 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
join_handles.push(tokio::spawn(async move {
//
// 在这里执行任务...
//
drop(permit);
}));
}

for handle in join_handles {
handle.await.unwrap();
}
}

上面代码创建了一个容量为 3 的信号量,当正在执行的任务超过 3 时,剩下的任务需要等待正在执行任务完成并减少信号量后到 3 以内时,才能继续执行。acquire_owned就相当于P操作,drop(permit)就相当于V操作。通过PV操作来控制各个线程资源的使用。

3.4 内存模型与atomic

Mutex用起来简单,但是无法并发读,RwLock可以并发读,但是使用场景较为受限且性能不够。atomicstd::sync::atomic)则是包含用于无锁并发编程的原子类型,另外,atomic是所有锁实现的基础。

原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。

内存模型

在具体介绍atomic之前,先介绍一下内存模型。

(1)顺序一致性SC

看一个例子。假设有下面一个简单的程序,运行两个线程,其中AB初始值都是0:

线程 1线程 2
1.A=13.B=2
2.println!(“{}”,B)4.println!(“{}”,A)

这个程序的运行结果是什么呢?这应该有多种情况:

  1. 先执行线程1,线程1结束后,再执行线程2

    语句的顺序为1,2,3,4,运行结果为:0,1

  2. 先执行线程2,线程2结束后,再执行线程1

    语句的顺序为3,4,1,2,运行结果为:0,2

前两种情况是串行执行的,下面来看线程交替执行的情况:

  1. 语句的顺序为1,3,2,4(或3,1,2,4),运行结果为:2,1
  2. 语句的顺序为1,3,4,2(或3,1,4,2),运行结果为:1,2

另外,不应该出现的是另一种结果:0,0,这是因为我们直观地认为这段程序符合顺序一致性。

假设程序能够输出0,0,那么对于两个println!语句来说,要想输出0,0,那么就必须要求:

  • 语句2要比语句3早执行
  • 语句4要比语句1早执行

另外,我们其实隐式地规定了线程内部的执行顺序:线程内的语句的执行顺序一定是按照程序规定的顺序(从上到下依次)执行的,这也符合人的阅读习惯。因此还有:

  • 语句1要比语句2早执行
  • 语句3要比语句4早执行

现在,要满足这4条规则,我们无法给出一个语句序列,使得程序能够运行,因此假设不成立,程序不能输出0,0

这就是顺序一致性(Sequential Consistency)的思想,当并行运行的多个线程操作单个内存,一切都必须按顺序发生。不存在两个事件可以同时发生,因为它们都在访问一个内存。

顺序一致性的规则可以总结为两条:

  • 每个线程内的语句都是按照程序规定的顺序依次执行的
  • 在线程之间,语句可以是交错执行的,但是所有线程所看到的总体执行顺序一样

对于第一条很好理解。第二条的意思是,线程之间语句可以交替执行,但无论从线程1还是线程2看上去,执行顺序都是相同的。拿开头的例子来说,如果线程1看到的语句顺序是1,3,2,4,那么对于线程2也是如此。

对于顺序一致性,其实上就像开关一样。假设有N个线程,在每个步骤中,开关选择一个线程中的指令运行并运行完毕,然后在下一个步骤再去选择一个线程…这样看来,在同一时间只有一个线程在工作,因此它非常慢,我们一次只能运行一条指令,失去了让多个线程并行运行的优点。

(2)全存储排序TSO

全存储排序(Total Store Ordering)模型是这样的,现代CPU往往都具有多核心,多级缓存,对于一个写入操作的完成,把值写入CPU核心的缓冲区即可,而无需等到它真正写入内存。

还是以SC的这个例子说明:

线程 1线程 2
1.A=13.B=2
2.println!(“{}”,B)4.println!(“{}”,A)

在全存储排序模型下,执行一个写入操作返回时,并不意味着内存中的值立即被修改:

image-20230311181133205

与其等待A=1依次经过缓存写入内存再返回,不如将其写入缓冲区之后直接返回。由于存储缓冲区在CPU核心上,因此访问速度非常快。在之后的某个时间,缓存层次结构将从存储缓冲区中提取写入并将其传播到缓存中,以便它对其他线程可见。

因此,可能出现以下情况:

  1. 执行操作1,写入A=1到CPU的存储缓冲区之后立即返回,并没有更新到所有CPU都能访问到的内存中。
  2. 执行操作3,写入B=2到CPU的存储缓冲区之后立即返回,并没有更新到所有CPU都能访问到的内存中。
  3. 执行操作4,读取内存中A的值,此时还没有被更新,所以读取到0
  4. 执行操作2,读取内存中B的值,此时还没有被更新,所以读取到0

因此,在引入了存储缓冲区之后,在顺序一致性模型下不能输出的0,0也可以输出了。

并且,这种缓冲区保留了单线程的预期行为。考虑单线程的代码:

1
2
3
// A初始为0
let A = 1; // 操作1
println!("{}", A); // 操作2

image-20230311181657746

写入A=1到CPU的存储缓冲区之后立即返回,执行println!时难道也会是0吗?并不,由于操作2的读取需要查看操作1的值,程序保留了预期的行为。如果去内存读取,那么只能拿到一个旧值,但是因为线程在同一个CPU上运行,读取可以直接检查存储缓冲区,查看它是否包含对正在读取的位置的写入,然后使用该值。因此,即使使用存储缓冲区,该程序也能正确打印出1

TSO保留了SC的第一条规则,即每个线程内的语句都按照程序规定的顺序依次执行。但TSO允许使用缓冲区,这些缓冲区减少了写入延迟。使语句更快地返回,程序的执行速度显著加快。SC的限制比较严格,TSO则弱化了它。

事实上,几乎每个现代架构都包含一个存储缓冲区,因此,内存模型至少与TSO一样弱。

(3)松弛型内存模型

前两种内存模型相对严格,限制了现代处理器架构下广泛使用的优化措施。例如,无法使用编译器和处理器的重排序优化松弛型内存模型(Relaxed memory models)则可以使用编译器和处理器的重排序优化,这导致程序的执行顺序并不与程序员看到的顺序为准。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static mut X: u64 = 0;
static mut Y: u64 = 1;

fn main() {
... // A

unsafe {
... // B
X = 1;
... // C
Y = 3;
... // D
X = 2;
... // E
}
}

假如在CD代码片段中,根本没有用到X = 1,那么编译器很可能会将X = 1X = 2进行合并:

1
2
3
4
5
6
7
8
9
10
 ...     // A

unsafe {
... // B
X = 2;
... // C
Y = 3;
... // D
... // E
}

若代码A中创建了一个新的线程用于读取全局静态变量X,则该线程将无法读取到X = 1的结果,因为在编译阶段就已经被优化掉。

因此,松弛型内存模型中,编译器可以在满足程序单线程执行结果的情况下对代码进行重排序,这就导致程序的执行顺序不一定和代码中编写的一样。

内存栅栏(内存屏障)

由于多种内存模型的出现,导致执行指令并不需要真正写入内存就可以返回,继续执行其它指令。但有些时候我们确实需要保证值写入内存后才返回。另外,不同的CPU架构下,所使用的内存模型也不同,有些架构没有做到内存强一致性,并且CPU也可能会对指令做重排优化。这就导致同样的代码在一些CPU上执行时,出现与预期不符的情况,就像前面SC模型中不可能出现的0,0结果可以在松弛型内存模型出现一样。但程序员是面向抽象机编程的,不应该因架构导致的结果不同而修改代码,甚至有些时候根本难以发现这些错误。

因此,引入了内存栅栏(memory barrier)。内存栅栏保证了在执行内存栅栏前的所有的内存操作的结果都写入到内存中(全局共享),在这之后继续执行其它指令。可以将其理解为手动的SC:在程序执行的某些点的前后保持顺序一致性,确保操作不会乱序。

换句话说,程序员只需要恰当地使用内存栅栏去标记出在并发程序中需要同步的变量和操作,就可以显式地告诉编译器和CPU不要对这些部分作出违反顺序一致性的优化。而对于程序的剩余部分,可以进行优化。这样既保证了正确性又保证了CPU和编译器可以做尽可能多的性能优化。

限定内存顺序

现在,我们来看看rust中提供的限定内存顺序的memory barriers,详见std::sync::atomic::Ordering,rust的内存顺序 与 C++ 20 相同,它们可以看成是对编译器和CPU内存序的控制的接口。

1
2
3
4
5
6
7
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}

Ordering是一个枚举,它们对一致性的要求逐渐增强:

  • Relaxed,对应的松弛型内存模型,对应于 C++ 20 中的 memory_order_relaxed
    • 针对一个变量的读写操作是原子操作
    • 不同线程之间针对该变量的访问操作先后顺序不能得到保证,即有可能乱序
  • Release,用来修饰一个写操作,表示在本线程中,在本行代码之前,有任何读写内存的操作,都不能重新排序到本行语句之后。对应于 C++ 20 中的 memory_order_release
  • Acquire,用来修饰一个读操作,表示在本线程中,在本行代码之后,有任何读写内存的操作,都不能重新排序到本行语句之前。对应于 C++ 20 中的 memory_order_acquire
  • AcqRel,同时拥有ReleaseAcquire的保证。对应于 C++ 20 中的 memory_order_acq_rel
  • SeqCst,顺序一致性,保证所有线程都可以按相同的顺序看到所有顺序一致的操作,对应于 C++ 20 中的 memory_order_seq_cst

原子类型

std::sync::atomic中目前支持12种原子类型:

  • AtomicBool
  • AtomicI8
  • AtomicI16
  • AtomicI32
  • AtomicI64
  • AtomicIsize
  • AtomicPtr
  • AtomicU8
  • AtomicU16
  • AtomicU32
  • AtomicU64
  • AtomicUsize

每个原子类型对应其普通类型,只不过它们的操作是原子性的。拿其中的一个AtomicI8举例,它实现了一系列操作,比如加法、减法、存值、取值等,比如store方法和load的函数签名:

1
2
pub fn store(&self, val: i8, order: Ordering)
pub fn load(&self, order: Ordering) -> i8

注意第二个参数都是Ordering类型,它描述的就是该操作的内存顺序,或者说需要使用的内存栅栏。

限定内存顺序的实例

下面,通过一个例子来总结前面的知识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
const IS_OK: AtomicBool = AtomicBool::new(false);
static mut NUM: i32 = 0;
fn release() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
NUM = 100; // 1.
}
IS_OK.store(true, Ordering::Relaxed); // 2.
})
}

fn aquire() -> JoinHandle<()> {
thread::spawn(move || {
IS_OK.load(Ordering::Relaxed); // 3.
unsafe {
println!("{}", NUM); // 4.
}
})
}
fn main() {
let handle_r = release();
let handle_a = aquire();
handle_a.join();
handle_r.join();
}

对于release函数,如果采用松弛型内存模型,也就是Relaxed,那么在编译优化或者CPU优化的过程中,可能会发生2先于1的情况:

1
2
3
4
5
6
7
8
fn release() -> JoinHandle<()> {
thread::spawn(move || {
IS_OK.store(true, Ordering::Relaxed); // 2.
unsafe {
NUM = 100; // 1.
}
})
}

为了保证写入的内存序,我们改用Release

1
2
3
4
5
6
7
8
9
fn release() -> JoinHandle<()> {
thread::spawn(move || {
let s = 5; // t.
unsafe {
NUM = 100; // 1.
}
IS_OK.store(true, Ordering::Release); // 2.
})
}

这样可以告诉编译器和CPU,保证2一定在1后执行。另外在这里注意,对于1t之间可以被乱序。

假设我们想让其它线程看到1,2的顺序,使用Relaxed是不够的,因为完全可能有下面的情况发生:

1
2
3
4
5
6
7
8
fn aquire() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
println!("{}", NUM); // 4.
}
IS_OK.load(Ordering::Relaxed); // 3.
})
}

因此需要对aquire修改:

1
2
3
4
5
6
7
8
fn aquire() -> JoinHandle<()> {
thread::spawn(move || {
IS_OK.load(Ordering::Acquire); // 3.
unsafe {
println!("{}", NUM); // 4.
}
})
}

Acquire保证了后续的内存操作都不能放到这条指令之前,因此这段代码应该可以正确地输出最终结果100。但事实并非如此,在测试过程中,发现了两处问题,首先要使用static IS_OK而不能用const IS_OK,否则无法成功地修改原子类型的值;其次,要使用while !IS_OK.load(Ordering::Acquire) {}循环确保IS_OKtrue,修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
// 这里要用static,不可以使用const
static IS_OK: AtomicBool = AtomicBool::new(false);
static mut NUM: i32 = 0;
fn release() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
NUM = 100; // 1.
}
IS_OK.store(true, Ordering::Release); // 2.
})
}

fn aquire() -> JoinHandle<()> {
thread::spawn(move || {
// 确保IS_OK为true
while !IS_OK.load(Ordering::Acquire) {} // 3.

unsafe {
println!("{}", NUM); // 4.
}
})
}
fn main() {
let handle_r = release();
let handle_a = aquire();
handle_a.join().unwrap();
handle_r.join().unwrap();
}

这一小节中的很多内容与操作系统、编译器和CPU本身有关,并不是rust所独有的,比如内存模型、内存屏障,以及与C++20相同的memory order,如果有些内容不理解,不妨看看C++的实现或者内存模型的更多资料。

3.5 使用 SyncSend 特征的可扩展并发

我们之前讨论的几乎所有内容,都属于标准库,而不是rust语言本身的内容。由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限:我们可以编写自己的或使用别人编写的并发功能。然而有两个并发概念是内嵌于语言中的:std::marker 中的 SyncSend 特征,它们称为标记特征(marker trait)。它们的作用分别是:

  • 实现Send的类型可以在线程间安全的传递其所有权
  • 实现Sync的类型可以在线程间安全的共享(通过引用)

通过Send允许在线程间转移所有权

Send 标记特征表明实现了 Send 的类型值的所有权可以在线程间传送。任何完全由 Send 的类型组成的类型会自动被标记为 Send。几乎所有rust类型都是 Send 的。不过有一些例外,包括 Rc<T>:这是不能 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。为此,Rc<T> 被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。

因此,正如线程安全的引用计数介绍的,rust类型系统和特征约束确保永远也不会意外的将不安全的 Rc<T> 在线程间发送。而使用标记为 SendArc<T> 时,就没有问题了。

另外一个不能 Send 的是裸指针。关于裸指针的介绍,见解引用裸指针

另外,互斥锁通过lock创建的std::sync::MutexGuard也没有实现Send

Sync允许多线程访问

Sync 标记特征表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T,如果 &TT 的不可变引用)是 Send 的话 T 就是 Sync 的,这意味着其引用就可以安全的发送到另一个线程。类似于 Send 的情况,基本类型是 Sync 的,完全由 Sync 的类型组成的类型也是 Sync 的。

智能指针 Rc<T> 也不是 Sync 的,原因与 Send 相同。RefCell<T>Cell<T> 系列类型不是 Sync 的,RefCell<T> 在运行时所进行的借用检查也不是线程安全的。Mutex<T>Sync 的,它可以被用来在多线程中共享访问。

我们曾经介绍过读写锁 RwLock,作为例子,我们来看看它的实现:

1
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}

首先RwLock可以在线程间安全的共享,那它肯定是实现了Sync,并且,RwLock可以并发读,说明其中的值T必定也可以在线程间共享,那T必定要实现Sync,在上面的特征约束中,可以看到T的特征约束中就有一个Sync特征。

对于互斥锁来说,它不允许多线程中共享引用,通过它的实现:

1
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}

可以看出,T的特征约束中果然没有Sync

再来对比一下Rc<T>Arc<T>,看看Arc<T>为何可以在多线程使用:

1
2
3
4
5
6
7
// Rc源码片段
impl<T: ?Sized> !marker::Send for Rc<T> {}
impl<T: ?Sized> !marker::Sync for Rc<T> {}

// Arc源码片段
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}

!代表移除特征的相应实现,上面代码中Rc<T>SendSync特征被特地移除了实现,而Arc<T>则相反,实现了Sync + Send

这些源码中的?Sized,我们将在后面介绍,详见Sized 特征

手动实现SendSync是不安全的

通常并不需要手动实现 SendSync特征,因为由 SendSync 的类型组成的类型,自动就是 SendSync 的。因为他们是标记特征,甚至都不需要实现任何方法。它们只是用来加强并发相关的不可变性的。

手动实现这些标记特征涉及到编写不安全的rust代码。关于不安全的代码,在后面不安全的rust一章中会进行介绍。

3.6 为裸指针实现 SendSync

裸指针本身就没有任何安全保证,因此它没有实现 SendSync,这意味着下面代码会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::thread;
fn main() {
let p = 5 as *mut u8;
let t = thread::spawn(move || {
println!("{:?}",p);
});

t.join().unwrap();
}
/*
error[E0277]: `*mut u8` cannot be sent between threads safely
--> src\main.rs:4:27
|
4 | let t = thread::spawn(move || {
| ------------- ^------
| | |
| _____________|_____________within this `[closure@src\main.rs:4:27: 4:34]`
| | |
| | required by a bound introduced by this call
5 | | println!("{:?}",p);
6 | | });
| |_____^ `*mut u8` cannot be sent between threads safely
|
= help: within `[closure@src\main.rs:4:27: 4:34]`, the trait `Send` is not implemented for `*mut u8`
*/

我们无法为其直接实现Send特征,但是可以用newtype(我们将在后面介绍,详见newtype):struct MyBox(*mut u8)。由于复合类型中有一个成员没实现Send,该复合类型就不是Send,因此我们需要手动为它实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
use std::thread;

#[derive(Debug)]
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
fn main() {
let p = MyBox(5 as *mut u8);
let t = thread::spawn(move || {
println!("{:?}",p);
});

t.join().unwrap();
}

此时,我们的指针已经可以在多线程间转移所有权,需要注意的就是,SendSyncunsafe特征,实现时需要用unsafe代码块包裹。

下面为裸指针实现 Sync,由于Sync是多线程间共享一个值,你可能会这么实现:

1
2
3
4
5
6
7
8
9
use std::thread;
fn main() {
let v = 5;
let t = thread::spawn(|| {
println!("{:?}",&v);
});

t.join().unwrap();
}

正如之前线程与move闭包提到的,这里没有使用move,线程如果直接去借用其它线程的变量,会报错:closure may outlive the current function。原因在于编译器无法确定主线程main和子线程t谁的生命周期更长,特别是当两个线程都是子线程时,没有任何人知道哪个子线程会先结束。

因此需要配合Arc去使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;

#[derive(Debug)]
struct MyBox(*const u8);
unsafe impl Send for MyBox {}

fn main() {
let b = &MyBox(5 as *const u8);
let v = Arc::new(Mutex::new(b));
let t = thread::spawn(move || {
let _v1 = v.lock().unwrap();
});

t.join().unwrap();
}
/*
error[E0277]: `*const u8` cannot be shared between threads safely
--> src\main.rs:12:27
|
12 | let t = thread::spawn(move || {
| _____________-------------_^
| | |
| | required by a bound introduced by this call
13 | | let _v1 = v.lock().unwrap();
14 | | });
| |_____^ `*const u8` cannot be shared between threads safely
|
= help: within `MyBox`, the trait `Sync` is not implemented for `*const u8`
*/

上面代码将智能指针v的所有权转移给新线程,同时v包含了一个引用类型b,当在新的线程中试图获取内部的引用时,就会报错。

这是因为我们访问的引用实际上还是对主线程中的数据的借用,转移进来的仅仅是外层的智能指针引用。解决方法就是为MyBox实现Sync:

1
unsafe impl Sync for MyBox {}