七、Tokio

1 介绍

1.1 总览

语言和库的支持中提到过,语言本身没有提供异步运行时,而是交由第三方实现。Tokio 就是 rust 的异步运行时之一。它提供了编写网络应用程序所需的构建块。它提供了针对各种系统的灵活性,从具有数十个内核的大型服务器到小型嵌入式设备。

在顶层上,Tokio 提供了几个主要组件:

  • 用于执行异步代码的多线程运行时
  • 标准库的异步版本
  • 一个庞大的库生态系统

1.2 Tokio的特点

  1. 快速

    Tokio 速度很快,建立在 Rust 编程语言之上,而 Rust 本身也很快。这符合 Rust 的设计理念,其目标是您不应该通过手动编写等效代码来提高性能。Tokio 是一个可扩展的库,构建在 async/await 语言特性之上,而 async/await 本身也是可扩展的。在处理网络时,由于延迟的存在,处理连接的速度存在一定的限制,因此唯一的扩展方式就是同时处理多个连接。借助 async/await 语言特性,增加并发操作的数量变得非常廉价,使你能够扩展到大量的并发任务。

  2. 可靠

    Tokio 是使用 Rust 构建的,Rust 是一种赋予每个人构建可靠高效软件能力的语言。许多研究发现,大约 70% 的高严重性安全漏洞是由于内存不安全造成的。使用 Rust 可以在应用程序中消除这一类错误。Tokio 也非常注重提供一致的行为,避免出现意外情况。Tokio 的主要目标是允许用户部署可预测的软件,在日复一日的运行中表现一致,具有可靠的响应时间,并且没有不可预测的延迟波动。

  3. 简单

    借助 Rust 的 async/await 特性,编写异步应用程序的复杂性大大降低。再加上 Tokio 提供的工具和充满活力的生态系统,编写应用程序变得轻而易举。在有意义时,Tokio 遵循标准库的命名约定。这使得可以轻松地将仅使用标准库编写的代码转换为使用 Tokio 编写的代码。得益于 Rust 强大的类型系统,轻松交付正确代码的能力无与伦比。

  4. 灵活

    Tokio 提供多种运行时的变体。从多线程的工作窃取运行时到轻量级的单线程运行时,应有尽有。每个运行时都提供了许多参数,使用户可以根据自己的需求进行调整。

1.3 何时不应使用Tokio

尽管 Tokio 对于许多需要同时执行多个任务的项目非常有用,但也有一些使用情况不适合使用 Tokio。

  • 如果需要通过在多个线程上并行运行来加速 CPU 密集型计算,那么使用 Tokio 并不是一个合适的选择。Tokio 主要设计用于 IO 密集型应用程序,在这种应用程序中,每个单独的任务大部分时间都在等待 IO 操作。如果你的应用程序只需要并行运行计算任务,那么应该使用 rayon。然而,如果需要同时处理两种类型的任务,仍然可以进行“混合和匹配“的操作。
  • 读取大量文件的情况下,尽管看起来 Tokio 对于仅需要读取大量文件的项目可能很有用,但与普通线程池相比,Tokio 在这里并没有优势。这是因为操作系统通常不提供异步文件 API。
  • 在发送单个网络请求的情况下,Tokio 的优势在于需要同时执行多个任务的情况。如果你需要使用面向异步 Rust 的库(如 reqwest),但并不需要同时执行很多任务,那么最好选择该库的阻塞版本,因为这样可以简化项目。当然,使用 Tokio 仍然可以工作,但与阻塞 API 相比,并没有真正的优势。

1.4 Tokio功能标志

Tokio 由许多模块组成,这些模块提供了一系列在 Rust 中实现异步应用程序所必需的功能。

Tokio 使用一组功能标志(feature flags)来减少编译代码的量。可以只启用其中的某些功能而不是全部启用。默认情况下,Tokio 不启用任何功能,但允许用户根据自己的用例启用其中的一个子集。每个函数、结构体和特征都有一个或多个所需的功能标志,以便使用该项。以下是可用的功能标志列表:

  • full: 启用所有下面列出的所有功能,但不包括 test-utiltracing
  • rt: 启用 tokio::spawncurrent-thread 调度器和非调度器实用工具。
  • rt-multi-thread: 启用较重的多线程工作窃取调度器。
  • io-util: 启用基于 IO 的Ext 特征。
  • io-std: 启用 StdoutStdinStderr 类型。
  • net: 启用 tokio::net 类型,如 TcpStreamUnixStreamUdpSocket,以及(在类 Unix 系统上)AsyncFd 和(在 FreeBSD 上)PollAio
  • time: 启用 tokio::time 类型,并允许调度器启用内置定时器。
  • process: 启用 tokio::process 类型。
  • macros: 启用 #[tokio::main]#[tokio::test] 宏。
  • sync: 启用所有 tokio::sync 类型。
  • signal: 启用所有 tokio::signal 类型。
  • fs: 启用 tokio::fs 类型。
  • test-util: 启用基于测试的 Tokio 运行时基础设施。
  • parking_lot: 作为潜在的优化,内部使用 parking_lot crate 的同步原语。此外,在 const 上下文中构建某些原语需要此依赖项。MSRV(最低支持 Rust 版本)可能会根据所使用的 parking_lot 发布版本而增加。

注意:AsyncRead AsyncWrite 特征不需要任何特征并且始终可用。

作为初学者,最简单的入门方法是启用所有功能。通过启用 full 功能标志来执行此操作,在Cargo.toml[dependencies]中:

1
tokio = { version = "1", features = ["full"] }

这将启用所有公共 API。但要注意,这会引入许多额外的依赖项,你可能并不需要全部。

而作为库的作者,你的目标应该是提供基于 Tokio 的最轻量级 crate。为了实现这一目标,你应该确保只启用你需要的功能。这样可以使用户在使用你的 crate 时不必启用不必要的功能,比如你可能只需要 tokio::spawn 并使用 TcpStream 的库:

1
tokio = { version = "1", features = ["rt", "net"] }

2 Tokio的核心概念

在使用tokio之前,应当先理解tokio的核心概念:Runtimetask。只有理解了这两个核心概念,才能正确地、合理地使用tokio。

2.1 runtime

与其他 Rust 程序不同,异步应用程序需要运行时支持。特别是,以下运行时服务是必需的:

  • I/O 事件循环(称为驱动程序),用于驱动 I/O 资源并将 I/O 事件分派给依赖于它们的任务。
  • 调度器,用于执行使用这些 I/O 资源的任务。
  • 定时器,用于安排在一定时间后运行的工作。

Tokio 的 Runtime 将所有这些服务捆绑为单一类型,允许它们一起启动、关闭和配置。但是,通常不需要手动配置 Runtime ,用户可以直接使用 tokio::main 属性宏,它会在后台创建 Runtime

手动创建runtime

使用tokio::runtime::Runtime创建Runtime

1
2
3
4
5
6
use tokio;

fn main() {
// 创建runtime
let rt = tokio::runtime::Runtime::new().unwrap();
}

也可以使用tokio::runtime::Builder来配置并创建Runtime

1
2
3
4
5
6
7
8
9
10
11
use tokio;

fn main() {
// 创建带有线程池的runtime
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8) // 8个工作线程
.enable_io() // 可在runtime中使用异步IO
.enable_time() // 可在runtime中使用异步计时器(timer)
.build() // 创建runtime
.unwrap();
}

tokio提供了两种工作模式的runtime:

  • 多线程(线程池)的runtime(multi thread runtime),默认情况下,它将为系统上可用的每个 CPU 核心启动一个工作线程。这往往是大多数应用程序的理想配置。多线程调度器需要 rt-multi-thread 特性标志,默认选中。
  • 单一线程的runtime(single thread runtime,也称为current thread runtime),所有任务都将在当前线程上创建和执行,这需要 rt 功能标志。只有明确指定,才能创建出单一线程的runtime。

下面的例子创建单线程的rumtime:

1
2
3
use tokio::runtime;
// 创建单一线程的runtime
let rt = runtime::Builder::new_current_thread().build().unwrap();

这里以及上述代码中出现的的rt称为runtime Handle,它可以被clone。它可以spawn()生成异步任务,这些异步任务将绑定在其所引用的runtime中,还可以block_on()enter()进入其所引用的runtime,此外,还可以生成blocking thread。

使用 tokio::main 属性宏创建runtime

tokio提供了简化的创建方式,那就是通过 tokio::main 属性宏创建runtime。

1
2
3
4
5
6
use tokio;

#[tokio::main]
async fn main() {
println!("Hello world");
}

通过#[tokio::main]属性宏注解(annotation),使得async main自身成为一个async runtime。

与之等效的不使用属性宏的代码如下:

1
2
3
4
5
6
7
8
9
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}

注意几点:

  • 此宏旨在简化并针对不需要复杂设置的应用程序。如果提供的功能还不够,仍然可以使用tokio::runtime::Builder,它提供了更强大的接口。
  • 此宏可用于任何函数,而不仅仅是 main 函数。不过在非main函数上使用它会使该函数表现得好像它是同步的,每次调用它时都会启动一个新的runtime。如果经常调用该函数,最好使用tokio::runtime::Builder创建运行时,以便可以在调用之间重用运行时。

默认情况下,#[tokio::main]创建的是多线程runtime,因此它实际相当于:

1
#[tokio::main(flavor = "multi_thread"]

要使用 current_thread 运行时的单线程运行时,可以使用以下命令配置宏:

1
2
3
4
5
6
7
8
9
10
#[tokio::main(flavor = "current_thread")]

// 这等价于
fn main() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async { ... })
}

还可以手动设置线程数量:

1
2
3
4
// 完整
#[tokio::main(flavor = "multi_thread", worker_threads = 2))]
// 简写
#[tokio::main(worker_threads = 2)]

多个runtime共存

可手动创建线程,并在不同线程内创建互相独立的runtime。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::thread;
use std::time::Duration;
use tokio::runtime::Runtime;

fn main() {
// 在第一个线程内创建一个多线程的runtime
let t1 = thread::spawn(||{
let rt = Runtime::new().unwrap();
thread::sleep(Duration::from_secs(10));
});

// 在第二个线程内创建一个多线程的runtime
let t2 = thread::spawn(||{
let rt = Runtime::new().unwrap();
thread::sleep(Duration::from_secs(10));
});

t1.join().unwrap();
t2.join().unwrap();
}

对于4核8线程的电脑,此时总共有19个OS线程:16个worker-thread,2个spawn-thread,一个main-thread。

runtime实现了SendSync这两个特征,因此也可以将runtime包在Arc里,然后跨线程使用同一个runtime。

在runtime中执行异步任务

了解如何创建runtime之后,我们需要实际让它们执行一些任务。这些任务一般都是一些IO的任务(以发挥tokio的最大效能),比如发送或者处理网络IO等等。

在学习使用的过程中,暂时不需要实现这些复杂的任务逻辑,我们将这些任务抽象成睡眠操作,用tokio::time::sleep()代表这些异步IO。注意,std::time也提供了sleep(),但它会阻塞整个线程,而tokio::time中的sleep()则只是让它所在的任务放弃CPU并进入调度队列等待被唤醒,它不会阻塞任何线程,它所在的线程仍然可被用来执行其它异步任务。因此,在tokio的runtime中,应当使用tokio::time中的sleep()

在runtime中执行一段任务:

1
2
3
4
5
6
7
8
9
10
11
use tokio::runtime::Runtime;
use chrono::Local;

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("before sleep: {}", Local::now().format("%F %T.%3f"));
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
println!("after sleep: {}", Local::now().format("%F %T.%3f"));
});
}

这里使用了chrono第三方crate,用于查看开始和结束的时间,在Cargo.toml[dependencies]中添加:

1
chrono = "0.4.26"

编译运行,可以看出sleep了2秒。另外,在上面的程序中,上面调用了runtime的block_on(),该方法要求一个Future作为参数,可以像上面一样直接使用一个async {}语法来定义一个Future。每一个Future都是一个已经定义好但尚未执行的异步任务,每一个异步任务中可能会包含其它子任务。

这些异步任务被创建好后不会立即执行,需要先将它们放入到runtime环境,然后在合适的地方通过Futureawait来执行它们。await可以将已经定义好的异步任务立即加入到runtime的任务队列中等待调度执行,于此同时,await会等待该异步任务完成才返回。

将任务创建和执行拆开来看:

1
2
3
4
5
6
7
8
9
10
rt.block_on(async {
// 只是定义了Future,此时尚未执行
let task = tokio::time::sleep(tokio::time::Duration::from_secs(2));
// ...不会执行...
// ...
// 开始执行task任务,并等待它执行完成
task.await;

// 上面的任务完成之后,才会继续执行下面的代码
});

block_on会阻塞当前线程(例如阻塞住上面的main函数所在的主线程),直到其指定的异步任务树(可能有子任务)全部完成。这是运行时的入口点。

block_on的返回值为其所执行异步任务的返回值:

1
2
3
4
5
6
7
8
9
10
use tokio::{time, runtime::Runtime};

fn main() {
let rt = Runtime::new().unwrap();
let res: i32 = rt.block_on(async{
time::sleep(time::Duration::from_secs(2)).await;
3
});
println!("{}", res); // 3
}

使用spawn向runtime中添加新的异步任务

tokio::spawn用于创建一个新的异步任务,并为其返回一个 JoinHandle ,比如:

1
2
3
tokio::spawn(async {
time::sleep(time::Duration::from_secs(5)).await;
});

创建一个任务使该任务能够与其他任务并发执行。创建的任务可以在当前线程上执行,也可以发送到不同的线程执行。具体取决于当前的 Runtime 配置。 注意,该函数无法保证创建的任务会执行到完成。当runtime关闭时,所有未完成的任务都将被删除,无论该任务的生命周期如何。

另外,必须从runtime的上下文中调用此函数,在runtime内运行的任务始终在其上下文中。下面是一个错误的示例:

1
2
3
4
5
6
7
8
use tokio::{self, time};

fn main() {
// 错误,此时并不在runtime上下文中
tokio::spawn(async {
time::sleep(time::Duration::from_secs(5)).await;
});
}

必须处在runtime中,或使用 Runtime::enter 方法进入上下文,在下一小节会介绍。

但是在runtime上下文外部可以使用spawn定义任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use std::thread;

use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

// 在runtime外部定义一个异步任务,且该函数返回值不是Future类型
fn async_task() {
println!("create an async task: {}", now());
tokio::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
println!("async task over: {}", now());
});
}

fn main() {
let rt1 = Runtime::new().unwrap();
rt1.block_on(async {
// 调用函数,该函数内创建了一个异步任务,将在当前runtime内执行
async_task();
});
}

runtime自身也有spawn方法,因此,也可以传递runtime(注意,要传递runtime的引用),然后使用runtime的spawn()

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::{runtime::Runtime, time};
fn async_task(rt: &Runtime) {
rt.spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
});
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
async_task(&rt);
});
}

使用enter进入runtime上下文

block_on()是进入runtime的主要方式。但还有另一种进入runtime的方式:enter()block_on()进入runtime时,会阻塞当前线程,enter()进入runtime时,不会阻塞当前线程,它会返回一个EnterGuardEnterGuard没有其它作用,它仅仅只是声明从它开始的所有异步任务都将在runtime上下文中执行,直到删除该EnterGuard

删除EnterGuard并不会删除runtime,只是释放之前的runtime上下文声明。因此,删除EnterGuard之后,可以声明另一个EnterGuard,这可以再次进入runtime的上下文环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use tokio::{self, runtime::Runtime, time};
use chrono::Local;
use std::thread;

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();

// 进入runtime,但不阻塞当前线程
let guard1 = rt.enter();

// 生成的异步任务将放入当前的runtime上下文中执行
tokio::spawn(async {
time::sleep(time::Duration::from_secs(5)).await;
println!("task1 sleep over: {}", now());
});

// 释放runtime上下文,这并不会删除runtime
drop(guard1);

// 可以再次进入runtime
let guard2 = rt.enter();
tokio::spawn(async {
time::sleep(time::Duration::from_secs(4)).await;
println!("task2 sleep over: {}", now());
});

drop(guard2);

// 阻塞当前线程,等待异步任务的完成
thread::sleep(std::time::Duration::from_secs(10));
}

runtime和异步调度

runtime提供了异步IO驱动、异步计时器等异步API,还提供了任务的调度器(scheduler)和Reactor事件循环(Event Loop)。

简单来说,runtime就是一个控制如何执行任务的调度器。当一个异步任务需要运行,在Tokio的runtime实现中,这个任务会被放入到一个队列中,当任务不能继续下去的时候,它会让出CPU,进入睡眠状态,等待下一次被唤醒,

就绪队列中的每一个任务都是可运行的任务,可随时被调度器调度选中。调度时会选择哪一个任务,是调度器根据调度算法去决定的。tokio的作者,非常友好地提供了一篇他实现tokio调度器的思路,里面详细介绍了调度器的基本知识和tokio调度器的调度策略,见:10倍提升Tokio调度器的性能

tokio的两种线程:worker thread和blocking thread

需要注意,tokio提供了两种功能的线程:

  • 用于异步任务的工作线程(worker thread)
  • 用于同步任务的阻塞线程(blocking thread)

单个线程或多个线程的runtime,指的都是工作线程,即只用于执行异步任务的线程,这些任务主要是IO密集型的任务。tokio默认会将每一个工作线程均匀地绑定到每一个CPU核心上。

但是,有些必要的任务可能会长时间计算而占用线程,甚至任务可能是同步的,它会直接阻塞整个线程(比如thread::time::sleep()),这类任务如果计算时间或阻塞时间较短,勉强可以考虑留在异步队列中,但如果任务计算时间或阻塞时间可能会较长,它们将不适合放在异步队列中,因为它们会破坏异步调度,使得同线程中的其它异步任务处于长时间等待状态,也就是说,这些异步任务可能会被饿很长一段时间。

例如,直接在runtime中执行阻塞线程的操作,由于这类阻塞操作不在tokio系统内,tokio无法识别这类线程阻塞的操作,tokio只能等待该线程阻塞操作的结束,才能重新获得那个线程的管理权。换句话说,worker thread被线程阻塞的时候,它已经脱离了tokio的控制,在一定程度上破坏了tokio的调度系统。

1
2
3
4
rt.block_on(async{
// 在runtime中,让整个线程进入睡眠,注意不是tokio::time::sleep()
std::thread::sleep(std::time::Duration::from_secs(10));
});

因此,tokio提供了这两类不同的线程。worker thread只用于执行那些异步任务,异步任务指的是不会阻塞线程的任务。而一旦遇到本该阻塞但却不会阻塞的操作(如使用tokio::time::sleep()而不是std::thread::sleep()),会直接放弃CPU,将线程交还给调度器,使该线程能够再次被调度器分配到其它异步任务。blocking thread则用于那些长时间计算的或阻塞整个线程的任务。

blocking thread默认是不存在的,只有在调用了spawn_blocking()时才会创建一个对应的blocking thread。

blocking thread不用于执行异步任务,因此runtime不会去调度管理这类线程,它们在本质上相当于一个独立的thread::spawn()创建的线程,它也不会像block_on()一样会阻塞当前线程。它和独立线程的唯一区别,是blocking thread是在runtime内的,可以在runtime内对它们使用一些异步操作,例如await。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt1 = Runtime::new().unwrap();
// 创建一个blocking thread,可立即执行(由操作系统调度系统决定何时执行)
// 注意,不阻塞当前线程
let task = rt1.spawn_blocking(|| {
println!("in task: {}", now());
// 注意,是线程的睡眠,不是tokio的睡眠,因此会阻塞整个线程
thread::sleep(std::time::Duration::from_secs(10))
});

// 小睡1毫秒,让上面的blocking thread先运行起来
std::thread::sleep(std::time::Duration::from_millis(1));
println!("not blocking: {}", now());

// 可在runtime内等待blocking_thread的完成
rt1.block_on(async {
task.await.unwrap();
println!("after blocking task: {}", now());
});
}

输出:

1
2
3
in task: 2021-10-25 19:01:00
not blocking: 2021-10-25 19:01:00
after blocking task: 2021-10-25 19:01:10

需注意,blocking thread生成的任务虽然绑定了runtime,但是它不是异步任务,不受tokio调度系统控制。因此,如果在block_on()中生成了blocking thread或普通的线程,block_on()不会等待这些线程的完成。

1
2
3
4
5
6
rt.block_on(async{
// 生成一个blocking thread和一个独立的thread
// block on不会阻塞等待两个线程终止,因此block_on在这里会立即返回
rt.spawn_blocking(|| std::thread::sleep(std::time::Duration::from_secs(10)));
thread::spawn(||std::thread::sleep(std::time::Duration::from_secs(10)));
});

tokio允许的blocking thread队列很长(默认512个),且可以在runtime build时通过max_blocking_threads()配置最大长度。如果超出了最大队列长度,新的任务将放在一个等待队列中进行等待(比如当前已经有512个正在运行的任务,下一个任务将等待,直到有某个blocking thread空闲)。

blocking thread执行完对应任务后,并不会立即释放,而是继续保持活动状态一段时间,此时它们的状态是空闲状态。当空闲时长超出一定时间后(可在runtime build时通过thread_keep_alive()配置空闲的超时时长),该空闲线程将被释放。

blocking thread有时候是非常友好的,它像独立线程一样,但又和runtime绑定,它不受tokio的调度系统调度,tokio不会把其它任务放进该线程,也不会把该线程内的任务转移到其它线程。换言之,它有机会完完整整地发挥单个线程的全部能力,而不像worker线程一样,可能会被调度器打断。

关闭runtime

由于异步任务完全依赖于Runtime,而Runtime又是程序的一部分,它可以轻易地被删除(drop),这时Runtime会被关闭(shutdown)。

1
2
3
let rt = Runtime::new().unwrap();
...
drop(rt);

这里的变量rt,官方手册将其称为runtime的句柄(runtime handle)。

关闭Runtime时,将使得该Runtime中的所有异步任务被移除。完整的关闭过程如下:

  1. 先移除整个任务队列,保证不再产生也不再调度新任务

  2. 移除当前正在执行但尚未完成的异步任务,即终止所有的worker thread

  3. 移除Reactor,禁止接收事件通知

注意,这种删除runtime句柄的方式只会立即关闭未被阻塞的worker thread,那些已经运行起来的blocking thread以及已经阻塞整个线程的worker thread仍然会执行。但是,删除runtime又要等待runtime中的所有异步和非异步任务(会阻塞线程的任务)都完成,因此删除操作会阻塞当前线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
// 一个运行5秒的blocking thread
// 删除rt时,该任务将继续运行,直到自己终止
rt.spawn_blocking(|| {
thread::sleep(std::time::Duration::from_secs(5));
println!("blocking thread task over: {}", now());
});

// 进入runtime,并生成一个运行3秒的异步任务,
// 删除rt时,该任务直接被终止
let _guard = rt.enter();
rt.spawn(async {
time::sleep(time::Duration::from_secs(3)).await;
println!("worker thread task over 1: {}", now());
});

// 进入runtime,并生成一个运行4秒的阻塞整个线程的任务
// 删除rt时,该任务继续运行,直到自己终止
rt.spawn(async {
std::thread::sleep(std::time::Duration::from_secs(4));
println!("worker thread task over 2: {}", now());
});

// 先让所有任务运行起来
std::thread::sleep(std::time::Duration::from_millis(3));

// 删除runtime句柄,将直接移除那个3秒的异步任务,
// 且阻塞5秒,直到所有已经阻塞的thread完成
drop(rt);
println!("runtime droped: {}", now());
}

输出结果(注意结果中没有异步任务中println!()输出的内容):

1
2
3
worker thread task over 2: 2021-10-25 20:08:35
blocking thread task over: 2021-10-25 20:08:36
runtime droped: 2021-10-25 20:08:36

关闭runtime可能会被阻塞,因此,如果是在某个runtime中关闭另一个runtime,将会导致当前的runtime的某个worker thread被阻塞,甚至可能会阻塞很长时间,这是异步环境不允许的。

tokio提供了另外两个关闭runtime的方式:shutdown_timeout()shutdown_background()。前者会等待指定的时间,如果正在超时时间内还未完成关闭,将强行终止runtime中的所有线程。后者是立即强行关闭runtime。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use std::thread;
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();

rt.spawn_blocking(|| {
thread::sleep(std::time::Duration::from_secs(5));
println!("blocking thread task over: {}", now());
});

let _guard = rt.enter();
rt.spawn(async {
time::sleep(time::Duration::from_secs(3)).await;
println!("worker thread task over 1: {}", now());
});

rt.spawn(async {
std::thread::sleep(std::time::Duration::from_secs(4));
println!("worker thread task over 2: {}", now());
});

// 先让所有任务运行起来
std::thread::sleep(std::time::Duration::from_millis(3));

// 1秒后强行关闭Runtime
rt.shutdown_timeout(std::time::Duration::from_secs(1));
println!("runtime droped: {}", now());
}

输出:

1
runtime droped: 2021-10-25 20:16:02

需要注意的是,如果强行关闭runtime,可能会使得尚未完成的任务的资源泄露(因为阻塞的任务还在运行,直到它们结束,但是此时已经无法控制它们了)。因此,应小心使用强行关闭runtime的操作。

shutdown_background()函数相当于调用 shutdown_timeout(Duration::from_nanos(0))

2.2 task

什么是tokio中的任务

任务是轻量级、非阻塞的执行单元。任务类似于 OS 线程,但它们不是由 OS 调度程序管理,而是由 Tokio 运行时管理。这种通用模式的另一个名称是绿色线程(Green thread)。如果你熟悉 Go 的 goroutines、Kotlin 的协程或 Erlang 的进程,你可以将 Tokio 的任务视为类似的东西。

这里提到的“绿色线程”指的是由运行时库或虚拟机 (VM) 而非底层操作系统 (OS) 本机调度的线程。换句话说,绿色线程是相对于OS线程而言的,OS线程由操作系统提供并调度(内核空间),而绿色线程由用户提供并调度(用户空间)。

解释了何为绿色线程后,继续回到主题,什么是任务(task)?

在rust异步编程中,可以认为每定义一个Future,就定义了一个尚未执行的task,该task放入runtime中开始运行的时候,它就是真正的task,一个真正的异步任务。任务有以下特点:

  • 任务是轻量级的。因为任务是由tokio runtime而不是操作系统调度的,所以创建新任务或在任务之间切换不需要上下文切换并且开销相当低。创建、运行和销毁大量任务的成本非常低,尤其是与 OS 线程相比。
  • 任务是协作安排的。大多数操作系统实现了抢占式的多任务处理技术。这种调度技术允许操作系统为每个线程分配一段时间运行,然后将其抢占,并暂停该线程并切换到另一个线程。相比之下,任务实现了协作式的多任务处理。在协作式多任务处理中,任务可以一直运行直到它“放弃”(yield),表示当前无法继续执行,这时Tokio运行时的调度器会切换到执行下一个任务。
  • 任务是非阻塞的。通常情况下,当一个操作系统线程执行I/O或必须与另一个线程同步时,它会被阻塞,允许操作系统调度另一个线程。而对于任务而言,当任务无法继续执行时,必须“放弃”(yield),以便Tokio运行时可以调度另一个任务。一般来说,任务不应该执行系统调用或其他可能阻塞线程的操作,否则这将阻止在同一线程上运行的其他任务被执行。

要注意,在tokio runtime中执行的并不都是异步任务,绑定在runtime中的可能是同步任务(例如一个数值计算就是一个同步任务,只是速度非常快,可忽略不计),可能会长时间计算,可能会阻塞整个线程。tokio严格区分异步任务和同步任务,只有异步任务才算是tokio task。tokio推荐的做法是将同步任务放入blocking thread中运行。

tokio::task

tokio::task模块提供了几个函数:

  • tokio::task::spawn,它和tokio::spawn是同一个函数,用于向runtime中添加新的异步任务
  • tokio::task::spawn_blocking,生成一个阻塞的线程(blocking thread)并执行指定的任务
  • tokio::task::block_in_place,在某个worker thread中执行同步任务,但是会将同线程中的其它异步任务转移走,使得异步任务不会被同步任务饥饿。此函数不能在 current_thread 运行时中使用,因为在这种情况下,没有其他工作线程可以将任务交给它,使用此函数创建的任务无法被取消,当关闭执行器时,它将无限期地等待所有阻塞操作完成。可以使用 shutdown_timeout 在特定超时后停止等待它们。请注意,这仍然不会取消任务——它们只是允许在方法返回后继续运行。
  • tokio::task::yield_now,放弃CPU,将执行权交还给 Tokio 运行时。当前任务将作为待处理任务重新添加到待处理队列的后面。将安排任何其他挂起的任务。任务继续不需要其他唤醒。通常情况下,在调用yield_now()后,不能保证运行时要调度哪个任务。特别是,运行时可以选择立即再次轮询刚刚运行yield_now()的任务,而不先轮询任何其他任务。例如,在每次轮询任务之间,运行时不会驱动IO驱动程序,这可能导致运行时立即再次轮询当前任务,即使有另一个任务正在等待来自IO驱动程序的通知也可能如此,而该任务可以继续运行。
  • tokio::task::unconstrained,将指定的异步任务声明为不受限的异步任务,它将不受tokio的协作式调度,它将一直霸占当前线程直到任务完成,不会受到tokio调度器的管理,使用它可能会使你的其他任务面临饥饿。
  • tokio::task::spawn_local,生成一个在当前线程内运行,一定不会被窃取到其它线程中运行的异步任务。这是因为生成的Future实现了!Send(即没有实现Send)。

这里的三个spawn类的方法都返回JoinHandle类型,JoinHandle类型可以通过await来等待异步任务的完成,也可以通过abort()来中断异步任务,异步任务被中断后返回JoinError类型。

task::spawn()

这个很简单,就是直接在当前的runtime中生成一个异步任务。在使用spawn向runtime中添加新的异步任务介绍过。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use chrono::Local;
use std::thread;
use tokio::{self, task, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
let _guard = rt.enter();
task::spawn(async {
time::sleep(time::Duration::from_secs(3)).await;
println!("task over: {}", now());
});

thread::sleep(time::Duration::from_secs(4));
}

task::spawn_blocking()

生成一个blocking thread来执行指定的任务。这个也在上文tokio的两种线程:worker thread和blocking thread(#tokio的两种线程:worker thread和blocking thread)中介绍过。

1
2
3
4
5
6
7
let join = task::spawn_blocking(|| {
// do some compute-heavy work or call synchronous code
"blocking completed"
});

let result = join.await?;
assert_eq!(result, "blocking completed");

task::block_in_place()

block_in_place()的目的和spawn_blocking()类似。区别在于spawn_blocking()会新生成一个blocking thread来执行指定的任务,而block_in_place()是在当前worker thread中执行指定的可能会长时间运行或长时间阻塞线程的任务,但是它会先将该worker thread中已经存在的异步任务转移到其它worker thread,使得这些异步任务不会被饥饿。

显然,block_in_place()只应该在多线程runtime环境中运行,如果是单线程runtime,block_in_place会阻塞唯一的那个worker thread。

1
2
3
4
5
use tokio::task;

task::block_in_place(move || {
// do some compute-heavy work or call synchronous code
});

在block_in_place内部,可以使用block_on()enter()重新进入runtime环境。

1
2
3
4
5
6
7
8
use tokio::task;
use tokio::runtime::Handle;

task::block_in_place(move || {
Handle::current().block_on(async move {
// do something async
});
});

task::yield_now()

放弃CPU,将执行权交还给 Tokio 运行时。当前任务将作为待处理任务重新添加到待处理队列的后面。注意,调用yield_now()后还需await才立即放弃CPU,因为yield_now本身是一个异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::task;

async {
task::spawn(async {
// ...
println!("spawned task done!")
});

// Yield, allowing the newly-spawned task to execute first.
task::yield_now().await;
println!("main task done!");
}

task::unconstrained

tokio的异步任务都是受tokio调度控制的,tokio采用协作式调度策略来调度它所管理的异步任务。当异步任务中的执行到了某个本该阻塞的操作时(即使用了tokio提供的那些原本会阻塞的API,例如tokio版本的sleep()),将不会阻塞当前线程,而是进入等待队列,等待Reactor接收事件通知来唤醒该异步任务,这样当前线程会被释放给调度器,使得调度器能够继续分配其它异步任务到该线程上执行。

task::unconstrained则将指定的异步任务声明未不受限的异步任务,它将不受tokio的协作式调度,它将一直霸占当前线程直到任务完成,不会受到tokio调度器的管理,使用它可能会使你的其他任务面临饥饿,如果确实有这样的需求,建议使用block_in_place()spawn_blocking()

取消任务abort()

前面提到,三个spawn类的方法都返回JoinHandle类型,该类型用于管理任务。比如正在执行的异步任务可以随时被abort()取消,取消之后的任务返回JoinError类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
use tokio::{self, runtime::Runtime, time};

fn main() {
let rt = Runtime::new().unwrap();

rt.block_on(async {
let task = tokio::task::spawn(async {
time::sleep(time::Duration::from_secs(10)).await;
});

// 让上面的异步任务跑起来
time::sleep(time::Duration::from_millis(1)).await;
task.abort(); // 取消任务
// 取消任务之后,可以取得JoinError
let abort_err: JoinError = task.await.unwrap_err();
println!("{}", abort_err.is_cancelled());
})
}

如果异步任务已经完成,再对该任务执行abort()操作将没有任何效果。也就是说,没有JoinError,task.await.unwrap_err()将报错,而task.await.unwrap()不会报错。

固定在线程内执行的本地异步任务

tokio::task::LocalSet是在同一线程上执行的一组任务,也就是说,它们不会被跨线程执行。在某些情况下,运行一个或多个未实现 SendFuture是有必要的,因为在线程之间发送任务不安全。在这些情况下,本地任务集可用于安排一个或多个 !SendFuture在同一线程上一起运行。

要使用tokio::task::LocalSet,需使用LocalSet::new()先创建好一个LocalSet实例,它将生成一个独立的任务队列用来存放本地异步任务。

之后,便可以使用LocalSetspawn_local()向该队列中添加异步任务。但是,添加的异步任务不会直接执行,只有对LocalSet调用await或调用LocalSet::run_until()LocalSet::block_on()的时候,才会开始运行本地队列中的异步任务。调用后两个方法会进入LocalSet的上下文环境。

例如,使用await来运行本地异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
let local_tasks = tokio::task::LocalSet::new();

// 向本地任务队列中添加新的异步任务,但现在不会执行
local_tasks.spawn_local(async {
println!("local task1");
time::sleep(time::Duration::from_secs(5)).await;
println!("local task1 done");
});

local_tasks.spawn_local(async {
println!("local task2");
time::sleep(time::Duration::from_secs(5)).await;
println!("local task2 done");
});

println!("before local tasks running: {}", now());
rt.block_on(async {
// 开始执行本地任务队列中的所有异步任务,并等待它们全部完成
local_tasks.await;
});
}

此外,task::spawn_local也用于生成一个在当前线程内运行,一定不会被窃取到其它线程中运行的异步任务。它实际上是在当前的 LocalSet 上生成一个 !SendFuture。但是它的使用有个限制,必须在LocalSet上下文中才能调用。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
let local_tasks = tokio::task::LocalSet::new();

local_tasks.spawn_local(async {
println!("local task1");
time::sleep(time::Duration::from_secs(2)).await;
println!("local task1 done");
});

local_tasks.spawn_local(async {
println!("local task2");
time::sleep(time::Duration::from_secs(3)).await;
println!("local task2 done");
});

println!("before local tasks running: {}", now());
// LocalSet::block_on进入LocalSet上下文
local_tasks.block_on(&rt, async {
tokio::task::spawn_local(async {
println!("local task3");
time::sleep(time::Duration::from_secs(4)).await;
println!("local task3 done");
}).await.unwrap();
});
println!("all local tasks done: {}", now());
}

需要注意的是,调用LocalSet::block_on()LocalSet::run_until()时均需指定一个异步任务(Future)作为其参数,它们都会立即开始执行该异步任务以及本地任务队列中已存在的任务,但是这两个函数均只等待其参数对应的异步任务执行完成就返回。这意味着,它们返回的时候,可能还有正在执行中的本地异步任务,它们会继续保留在本地任务队列中。当再次进入LocalSet上下文或await LocalSet的时候,它们会等待调度并运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
use chrono::Local;
use std::thread;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
let local_tasks = tokio::task::LocalSet::new();

local_tasks.spawn_local(async {
println!("local task1");
time::sleep(time::Duration::from_secs(2)).await;
println!("local task1 done {}", now());
});

// task2要睡眠10秒,它将被第一次local_tasks.block_on在3秒后中断
local_tasks.spawn_local(async {
println!("local task2");
time::sleep(time::Duration::from_secs(10)).await;
println!("local task2 done, {}", now());
});

println!("before local tasks running: {}", now());
local_tasks.block_on(&rt, async {
tokio::task::spawn_local(async {
println!("local task3");
time::sleep(time::Duration::from_secs(3)).await;
println!("local task3 done: {}", now());
}).await.unwrap();
});

// 线程阻塞15秒,此时task2睡眠10秒的时间已经过去了,
// 当再次进入LocalSet时,task2将可以直接被唤醒
thread::sleep(std::time::Duration::from_secs(15));

// 再次进入LocalSet
local_tasks.block_on(&rt, async {
// 先执行该任务,当遇到睡眠1秒的任务时,将出现任务切换,
// 此时,调度器将调度task2,而此时task2已经睡眠完成
println!("re enter localset context: {}", now());
time::sleep(time::Duration::from_secs(1)).await;
println!("re enter localset context done: {}", now());
});
println!("all local tasks done: {}", now());
}

输出结果:

1
2
3
4
5
6
7
8
9
10
before local tasks running: 2021-10-26 20:19:11
local task1
local task3
local task2
local task1 done 2021-10-26 20:19:13
local task3 done: 2021-10-26 20:19:14
re enter localset context: 2021-10-26 20:19:29
local task2 done, 2021-10-26 20:19:29
re enter localset context done: 2021-10-26 20:19:30
all local tasks done: 2021-10-26 20:19:30

需要注意的是,再次运行本地异步任务时,之前被中断的异步任务所等待的事件可能已经出现了,因此它们可能会被直接唤醒重新进入就绪队列等待下次轮询调度。正如上面需要睡眠10秒的task2,它会被第一次block_on中断,虽然task2已经不再执行,但是15秒之后它的睡眠完成事件已经出现,它可以在下次调度本地任务时直接被唤醒。但注意,唤醒的任务不是直接就可以被执行的,而是放入就绪队列等待调度。

这意味着,再次进入上下文时,所指定的Future中必须至少存在一个会引起调度切换的任务,否则该Future以同步的方式运行直到结束都不会给已经被唤醒的任务任何执行的机会(也就是说,上面的例子中再次进入LocalSet时,使用了await才引起了调度切换)。

将上面示例中的第二个block_on中的Future参数换成下面的async代码块,task2将不会被调度执行:

1
2
3
4
local_tasks.block_on(&rt, async {
println!("re-enter localset context, and exit context");
println!("task2 will not be scheduled");
})

下面是使用run_until()两次进入LocalSet上下文的示例,和block_on()类似,区别仅在于它只能在Runtime::block_on()内或[tokio::main]注解的main函数内部被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
use chrono::Local;
use std::thread;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
let local_tasks = tokio::task::LocalSet::new();

local_tasks.spawn_local(async {
println!("local task1");
time::sleep(time::Duration::from_secs(5)).await;
println!("local task1 done {}", now());
});

println!("before local tasks running: {}", now());
rt.block_on(async {
local_tasks
.run_until(async {
println!("local task2");
time::sleep(time::Duration::from_secs(3)).await;
println!("local task2 done: {}", now());
})
.await;
});

thread::sleep(std::time::Duration::from_secs(10));
rt.block_on(async {
local_tasks
.run_until(async {
println!("local task3");
tokio::task::yield_now().await;
println!("local task3 done: {}", now());
})
.await;
});
println!("all local tasks done: {}", now());
}

输出结果:

1
2
3
4
5
6
7
8
before local tasks running: 2021-10-26 21:23:18
local task2
local task1
local task2 done: 2021-10-26 21:23:21
local task3
local task1 done 2021-10-26 21:23:31
local task3 done: 2021-10-26 21:23:31
all local tasks done: 2021-10-26 21:23:31

判断任务是否已经终止

可使用JoinHandleis_finished()方法来判断任务是否已终止,它是非阻塞的。请注意,即使在任务上调用了 abort ,此方法也可以返回 false 。这是因为取消过程可能需要一些时间,并且此方法在完成之前不会返回 true

1
2
3
4
5
6
7
8
9
let task = tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
});

// 立即输出 false
println!("1 {}", task.is_finished());
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
// 输出 true
println!("2 {}", task.is_finished());

is_finished()常用于在多个任务中轮询直到其中一个任务终止。

任务集合JoinSet

tokio::task::JoinSet是在 Tokio 运行时生成的任务集合。JoinSet 可用于等待集合中部分或全部任务的完成。集合没有顺序,任务将按照完成的顺序返回。它通常用于收集一系列异步任务,并判断它们是否终止。

另外,所有任务都必须具有相同的返回类型 T 。当删除 JoinSet 时, JoinSet 中的所有任务都会立即中止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
let mut set = JoinSet::new();

for i in 0..10 {
set.spawn(async move { i });
}

let mut seen = [false; 10];
while let Some(res) = set.join_next().await {
let idx = res.unwrap();
seen[idx] = true;
}

for i in 0..10 {
assert!(seen[i]);
}
}

可以看出,如果要等待多个或所有任务完成,则循环join_next()即可。如果JoinSet为空,则该方法返回None

1
while let Some(_) = set.join_next().await {}

使用JoinSetabort_all()或直接drop JoinSet,都会对所有异步任务进行abort()操作。这不会从 JoinSet 中删除任务。要等待任务完成取消,您应该循环调用 join_next ,直到 JoinSet 为空。

使用JoinSetshutdown()方法,则中止所有任务并等待它们完成关闭。调用这个方法相当于调用 abort_all ,然后循环调用 join_next ,直到返回 None

使用JoinSetdetach_all()将从此 JoinSet 中删除所有任务而不中止它们。即使JoinSet被丢弃,被detach的任务也依然会在后台运行。

2.3 tokio宏

一次运行多个Future中介绍过一些常用的宏,比如join!等,下面要介绍的是tokio实现的版本。

tokio::join!tokio::try_join!

我们可以通过await等待某个异步任务完成,无论这个任务是正常完成还是被取消。

tokio也提供了两个宏tokio::join!tokio::try_join!,它们可以用于等待多个异步任务全部完成:

  • join!等待多个并发分支,当所有分支完成时才返回
  • try_join!等待多个并发分支,要么等待所有异步任务正常完成返回,要么在遇到第一个返回 Err(_)的任务返回

这两个宏必须在异步函数、闭包和块内部使用。

join!使用举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async fn do_stuff_async() {
// async work
}

async fn more_async_work() {
// more here
}

#[tokio::main]
async fn main() {
let (first, second) = tokio::join!(
do_stuff_async(),
more_async_work());

// do something with the values
}

try_join!用于两个分支(OkErr):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async fn do_stuff_async() -> Result<(), &'static str> {
// async work
}

async fn more_async_work() -> Result<(), &'static str> {
// more here
}

#[tokio::main]
async fn main() {
let res = tokio::try_join!(
do_stuff_async(),
more_async_work());

match res {
Ok((first, second)) => {
// do something with the values
}
Err(err) => {
println!("processing failed; error = {}", err);
}
}
}

try_join! 用于spawn生成的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
use tokio::task::JoinHandle;

async fn do_stuff_async() -> Result<(), &'static str> {
// async work
}

async fn more_async_work() -> Result<(), &'static str> {
// more here
}

async fn flatten<T>(handle: JoinHandle<Result<T, &'static str>>) -> Result<T, &'static str> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err("handling failed"),
}
}

#[tokio::main]
async fn main() {
let handle1 = tokio::spawn(do_stuff_async());
let handle2 = tokio::spawn(more_async_work());
match tokio::try_join!(flatten(handle1), flatten(handle2)) {
Ok(val) => {
// do something with the values
}
Err(err) => {
println!("Failed with {}.", err);
}
}
}

tokio::select!

future::select!相同,tokio::select!等待多个并发分支,在第一个分支完成时返回,同时取消其余分支。select! 宏必须在异步函数、闭包和块内部使用。

它的语法如下:

1
2
3
4
5
6
tokio::select! {
<pattern1> = <async expression 1> (, if <precondition1>)? => <handler1>, // branch 1
<pattern2> = <async expression 2> (, if <precondition2>)? => <handler2>, // branch 2
...
(else => <handler_else>)?
};

else分支是可选的,每个分支的if前置条件是可选的,如果前置条件返回 false ,则分支被禁用。提供的 <async expression> 仍然被评估,但结果 future 永远不会被轮询。在循环中使用 select! 时,此功能很有用。

去除可选项后,简化的语法为:

1
2
3
4
5
tokio::select! {
<pattern1> = <async expression 1> => <handler1>, // branch 1
<pattern2> = <async expression 2> => <handler2>, // branch 2
...
};

即,每个分支都有一个异步任务,并对异步任务完成后的返回结果进行模式匹配,如果匹配成功,则执行对应的handler。

一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async fn do_stuff_async() {
// async work
}

async fn more_async_work() {
// more here
}

#[tokio::main]
async fn main() {
tokio::select! {
_ = do_stuff_async() => {
println!("do_stuff_async() completed first")
}
_ = more_async_work() => {
println!("more_async_work() completed first")
}
};
}

下面是官方手册对select!工作流程的描述:

  1. 评估所有分支中存在的if前置条件,如果某个分支的前置条件返回false,则禁用该分支。注意,循环时,每一轮执行的select!都会清除分支的禁用标记
  2. 收集所有分支中的异步表达式(包括已被禁用的分支),并在同一个线程中推进所有未被禁用的异步任务执行,然后等待
  3. 当某个分支的异步任务完成,将该异步任务的返回值与对应分支的模式进行匹配,如果匹配成功,则执行对应分支的handler,如果匹配失败,则禁用该分支,本次select!调用不会再考虑该分支。如果匹配失败,则重新等待下一个异步任务的完成
  4. 如果所有分支最终都被禁用,则执行else分支,如果不存在else分支,则panic

默认情况下,select!会伪随机公平地轮询每一个分支,如果确实需要让select!按照任务书写顺序去轮询,可以在select!中使用biased,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#[tokio::main]
async fn main() {
let mut count = 0u8;
loop {
tokio::select! {
// 如果取消biased,挑选的任务顺序将随机,可能会导致分支中的断言失败
biased;
_ = async {}, if count < 1 => { count += 1; assert_eq!(count, 1); }
_ = async {}, if count < 2 => { count += 1; assert_eq!(count, 2); }
_ = async {}, if count < 3 => { count += 1; assert_eq!(count, 3); }
_ = async {}, if count < 4 => { count += 1; assert_eq!(count, 4); }
else => { break; }
};
}
}

另外,上面的例子中将select!放进了一个loop循环中,这是很常见的用法。对于上面的例子来说,如果注释掉biased,那么在第一轮循环中,由于select!中的4个if前置条件均为true,因此按照随机的顺序推进这4个异步任务。由于上面示例中的异步任务表达式不做任何事,因此第一个被推进的异步任务会先完成,selcet!将取消剩余3个任务,假如先完成任务的分支的断言通过,那么select!返回后将进入下一轮loop循环,重新调用一次select!宏,重新评估if条件,这次将只有3个分支通过检测,不通过的那个分支将被禁用,select!将按照随机顺序推进这3个分支。

2.4 等待任一一个异步任务的终止

虽然join!() try_join!() select!()都可以等待一个或多个异步任务完成,但是有些情况下它们并不方便使用。

例如,客户端连接到服务端时,服务端为每个客户端都开启了n个异步任务,这些异步任务被收集在一个容器中(如Vec),这些任务都是长久工作的,直到客户端断开。理所当然地,应当去等待这些任务直到任意一个任务终止,然后abort()所有剩余任务,从而避免客户端断开后仍然在后台任务运行没有意义的任务,这很容易会导致内存飞速暴涨。

因为异步任务被收集在容器中,因此无法使用join!() try_join!() select!()去等待这些异步任务中的任意一个的完成。

有几种方式处理这种情况:

  1. 可以考虑使用is_finished()来轮询判断(为了避免忙等消耗CPU,建议加上轮询间隔)。
1
2
3
4
5
6
7
8
9
10
11
12
let tasks = vec![ Some_async_tasks ];
'outer: loop {
for task in &tasks {
if task.is_finished() {
break 'outer;
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
for task in tasks {
task.abort();
}
  1. 考虑使用JoinSet
  2. 考虑使用futures::future::try_join_all或者futures::stream::FuturesUnordered

3 Tokio中的time计时器

tokio::time中提供了一些用于跟踪时间的实用工具。该模块提供了多种类型,用于在设定的时间段后执行代码。需要开启time特性后才可以使用:

1
tokio = {version = "1.13", features = ["rt", "rt-multi-thread", "time"]}

该模块提供的类型如下:

  • Duration,它是std::time::Duration的重导出,它用于描述时间跨度,如3秒就是一个时间跨度
  • Instant,单调非递减时钟的测量。不透明(只能相互比较,没有办法从Instant得到“秒数”)且仅能配合 Duration 使用。例如,此刻是处在某个时间点A,下一次(例如某个时长过后),处在另一个时间点B,时间点B一定不会早于时间点A,即便修改了操作系统的时钟或硬件时钟,它也不会时光倒流的现象
  • Sleep,是一个Future,通过调用sleep()sleep_until()返回,该Future本身不工作,它只在到达某个特定的 Instant 时间点时完成
  • Interval 是一个流式的间隔计时器,通过调用interval()interval_at()返回。它使用 Duration 进行初始化,表示每隔一段时间(即指定的Duration时长)后就产生一个值
  • Timeout :封装异步任务(Future或者Stream),将上限设置为允许执行的时间量。如果任务没有及时完成,那么它会被取消并返回一个错误

3.1 时间跨度tokio::time::Duration

使用Duration表示时间跨度

3.2 时间点 tokio::time::Instant

tokio::time::Instant是对std::time::Instant的封装,添加了一些对齐功能,使其能够适用于tokio runtime,关于标准库的Instant使用Instant表示时间点

Instant是严格单调递增的,绝不会出现时光倒流的现象,即之后的时间点一定晚于之前创建的时间点。但是,tokio time提供了pause()函数可暂停时间点,还提供了advance()函数用于向后跳转到某个时间点。

tokio::time::Instant::now()用于创建代表此时此刻的时间点。Instant可以直接进行大小比较,还能执行+-操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio;
use tokio::time::Instant;
use tokio::time::Duration;

#[tokio::main]
async fn main() {
// 创建代表此时此刻的时间点
let now = Instant::now();

// Instant 加一个Duration,得到另一个Instant
let next_3_sec = now + Duration::from_secs(3);
// Instant之间的大小比较
println!("{}", now < next_3_sec); // true

// Instant减Duration,得到另一个Instant
let new_instant = next_3_sec - Duration::from_secs(2);

// Instant减另一个Instant,得到Duration
let duration = new_instant - next_3_sec;
}

此外tokio::time::Instant提供了如下几个方法:

  • from_std(): 将std::time::Instant转换为tokio::time::Instant
  • into_std(): 将tokio::time::Instant转换为std::time::Instant
  • elapsed(): 指定的时间点实例,距离此时此刻的时间点,经过的时间,返回Duration
  • duration_since(): 返回从另一Instant到当前Instant经过的时间跨度(Duration),如果该Instant晚于这一Instant,则返回0(Duration
  • checked_duration_since():返回从另一Instant到当前Instant经过的时间跨度(Duration),如果该Instant晚于这一Instant,则返回None
  • saturating_duration_since(): 返回从另一Instant到当前Instant经过的时间跨度(Duration),如果该Instant晚于这一Instant,则返回0(Duration
  • checked_add(): 同标准库
  • checked_sub(): 同标准库

3.3 超时时间tokio::time::Timeout

该结构体是通过tokio::time::timeout()tokio::time::timeout_at()返回的Future

tokio::time::timeout()可设置一个异步任务的完成超时时间,它的签名如下:

1
2
3
pub fn timeout<F>(duration: Duration, future: F) -> Timeout<F>
where
F: Future,

指定一个durationfuture,如果future在指定的超时时间内已完成,则返回该异步任务的返回值,如果未完成,则异步任务被撤销并返回Err

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let res = time::timeout(time::Duration::from_secs(5), async {
println!("sleeping: {}", now());
time::sleep(time::Duration::from_secs(6)).await;
33
});

match res.await {
Err(_) => println!("task timeout: {}", now()),
Ok(data) => println!("get the res '{}': {}", data, now()),
};
});
}

得到结果:

1
2
sleeping: 2021-11-03 17:12:33
task timeout: 2021-11-03 17:12:38

如果将睡眠6秒改为睡眠4秒,那么将得到结果:

1
2
sleeping: 2021-11-03 17:13:11
get the res '33': 2021-11-03 17:13:15

再看看tokio::time::timeout_at()函数签名:

1
2
3
pub fn timeout_at<F>(deadline: Instant, future: F) -> Timeout<F>
where
F: Future,

这里指定的是指定一个deadlinefuture,该future必须在deadline之前完成,如果没有按时完成,则返回Err

取消超时可以通过dropTimeout实例来完成,不需要额外的清理或其他工作。

得到time::Timeout实例之后,可以通过它提供的get_refget_mut获得Timeout所封装的Future的可变和不可变引用,使用into_inner获得所封装的Future,这会消费掉该Future

3.4 时间间隔tokio::time::Interval

该结构体可以通过tokio::time::interval()tokio::time::interval_at()函数返回,它代表时间间隔,主要用于设置间隔一定时间的周期性任务。

tokio::time::interval_at()的签名如下:

1
pub fn interval_at(start: Instant, period: Duration) -> Interval

创建新的 Interval ,它以 period 为间隔,start参数用于控制间隔计时器的开始计时点。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::time::{interval_at, Duration, Instant};

#[tokio::main]
async fn main() {
let start = Instant::now() + Duration::from_millis(50);
let mut interval = interval_at(start, Duration::from_millis(10));

interval.tick().await; // ticks after 50ms
interval.tick().await; // ticks after 10ms
interval.tick().await; // ticks after 10ms

// approximately 70ms have elapsed.
}

tokio::time::interval() 的签名如下:

1
pub fn interval(period: Duration) -> Interval {}

创建以 period 为间隔的新 Interval,它在第一次被调用的时候立即开始计时,创建后这个时间间隔将无限期地执行。任何时候都可以删除 Interval 值,这将取消间隔。

下面是一个使用 interval 每两秒执行一次任务的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
let mut interval = time::interval(Duration::from_millis(10));

interval.tick().await; // ticks immediately
interval.tick().await; // ticks after 10ms
interval.tick().await; // ticks after 10ms

// approximately 20ms have elapsed.
}

可以看出,这两个函数只是定义了间隔计时器的起始计时点和间隔的时长,要真正开始让它开始计时,还需要调用它的tick()方法生成一个Future任务,并调用await来执行并等待该任务的完成。

有几点需要注意:

  1. interval_at()第一个参数定义的是计时器的开始时间,这样描述不准确,它表述的是最早都要等到这个时间点才开始计时。例如,定义计时器5秒之后开始计时,但在第一次tick()之前,先睡眠了10秒,那么该计时器将在10秒后才开始,但如果第一次tick之前只睡眠了3秒,那么还需再等待2秒该tick()计时任务才会完成。
  2. 定义计时器时,要将计时器变量声明为mut,因为每次tick()时,都需要修改计时器内部的下一次计时起点。
  3. 不像其它语言中的间隔计时器,tokio的间隔计时器需要手动调用tick()方法来生成临时的异步任务。

看下面的示例,定义5秒后开始的计时器,但在第一次开始计时前,先睡眠10秒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use chrono::Local;
use tokio::{
self,
runtime::Runtime,
time::{self, Duration, Instant},
};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("before: {}", now());

let start = Instant::now() + Duration::from_secs(5);
let interval = Duration::from_secs(1);
let mut intv = time::interval_at(start, interval);

time::sleep(Duration::from_secs(10)).await;
intv.tick().await;
println!("task 1: {}", now());
intv.tick().await;
println!("task 2: {}", now());
});
}

输出结果:

1
2
3
before: 2021-11-03 19:00:10
task 1: 2021-11-03 19:00:20
task 2: 2021-11-03 19:00:20

注意输出结果中的task 1和task 2的时间点是相同的,说明第一次tick之后,并没有等待1秒之后再执行紧跟着的tick,而是立即执行之。

简单解释一下上面示例中的计时器内部的工作流程,假设定义计时器的时间点是19:00:10:

  • 定义5秒后开始的计时器intv,该计时器内部有一个字段记录着下一次开始tick()的时间点,其值为19:00:15
  • 睡眠10秒后,时间点到了19:00:20,此时第一次执行intv.tick(),它将生成一个异步任务,执行器执行时发现此时此刻的时间点已经超过该计时器内部记录的值,于是该异步任务立即完成并进入就绪队列等待调度,同时修改计时器内部的值为19:00:16
  • 下一次执行tick的时候,此时此刻仍然是19:00:20,已经超过了该计时器内部的19:00:16,因此计时任务立即完成

这里要介绍Intervalmissed_tick_behavior方法,它返回一个tokio::time::MissedTickBehavior枚举。该枚举定义了 Interval 错过一次tick的行为。默认情况下,当错过一个tick时,Interval 会尽快触发tick,直到它及时“赶上”到它应该在的位置。具体来说,有三种策略:

  1. Brust(默认策略)

    冲刺型的计时策略,tick尽可能快,直到赶上为止。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    let mut interval = interval(Duration::from_millis(50));

    // First tick resolves immediately after creation
    // 第一次tick立即开始
    interval.tick().await;
    // 执行了一个200毫秒的任务
    task_that_takes_200_millis().await;
    // 执行完毕后,此时已经错过了tick

    // 因此我们开始加速,这一次tick会立即执行
    interval.tick().await;

    // 这一次tick应该在开始后100毫秒,已经错过了,因此立即执行
    interval.tick().await;

    // 在开始后150毫秒的tick,仍需要立即执行
    interval.tick().await;

    // 立即执行
    interval.tick().await;

    // 由于我们已经赶上了,下一次tick会正常等到开始后250毫秒开始执行
    interval.tick().await;

    这看起来像这样:

    1
    2
    Expected ticks: |     1     |     2     |     3     |     4     |     5     |     6     |
    Actual ticks: | work -----| delay | work | work | work -| work -----|
  2. Delay

    延迟性的计时策略。从调用 tick 开始,而不是从 start 开始,也就是说,当出现延迟后,仍然按部就班地每隔指定的时长计时。在内部,这种策略是在每次执行tick之后,都修改下一次计时起点为Instant::now() + Duration。因此,这种策略下的任何相邻两次的tick,其中间间隔的时长都至少达到Duration

    例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    use chrono::Local;
    use tokio::{self, runtime::Runtime};
    use tokio::time::{self, Duration, Instant, MissedTickBehavior};

    fn now() -> String {
    Local::now().format("%F %T").to_string()
    }

    fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
    println!("before: {}", now());

    let mut intv = time::interval_at(
    Instant::now() + Duration::from_secs(5),
    Duration::from_secs(2),
    );
    intv.set_missed_tick_behavior(MissedTickBehavior::Delay);

    time::sleep(Duration::from_secs(10)).await;

    println!("start: {}", now());
    intv.tick().await;
    println!("tick 1: {}", now());
    intv.tick().await;
    println!("tick 2: {}", now());
    intv.tick().await;
    println!("tick 3: {}", now());
    });
    }

    输出结果:

    1
    2
    3
    4
    5
    before: 2021-11-03 19:31:02
    start: 2021-11-03 19:31:12
    tick 1: 2021-11-03 19:31:12
    tick 2: 2021-11-03 19:31:14
    tick 3: 2021-11-03 19:31:16
  3. Skip

    忽略型的计时策略。这种策略总是以定义计时器时的起点为基准,类似等差数列,每一次执行tick的时间点,一定符合Start + N * Duration

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    use chrono::Local;
    use tokio::{self, runtime::Runtime};
    use tokio::time::{self, Duration, Instant, MissedTickBehavior};

    fn now() -> String {
    Local::now().format("%F %T").to_string()
    }

    fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
    println!("before: {}", now());

    let mut intv = time::interval_at(
    Instant::now() + Duration::from_secs(5),
    Duration::from_secs(2),
    );
    intv.set_missed_tick_behavior(MissedTickBehavior::Skip);

    time::sleep(Duration::from_secs(10)).await;

    println!("start: {}", now());
    intv.tick().await;
    println!("tick 1: {}", now());
    intv.tick().await;
    println!("tick 2: {}", now());
    intv.tick().await;
    println!("tick 3: {}", now());
    });
    }

    输出结果:

    1
    2
    3
    4
    5
    before: 2021-11-03 19:34:53
    start: 2021-11-03 19:35:03
    tick 1: 2021-11-03 19:35:03
    tick 2: 2021-11-03 19:35:04
    tick 3: 2021-11-03 19:35:06

上面通过interval_at()解释清楚了tokio::time::Interval的三种计时策略。但在程序中,更大的可能是使用interval()来定义间隔计时器,它等价于interval_at(Instant::now() + Duration),表示计时起点从现在开始的计时器。

此外,Interval还提供了一些方法。可以使用period()方法获取计时器的间隔时长,使用missed_tick_behavior()获取当前的计时策略,等等,见官方文档

3.5 睡眠 tokio::time::Sleep

该结构体是一个Future,由tokio::time::sleep()tokio::time::sleep_until()返回。

比如:

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::{self, runtime::Runtime, time};

fn main(){
let rt = Runtime::new().unwrap();
rt.block_on(async {
// 睡眠2秒
time::sleep(time::Duration::from_secs(2)).await;

// 一直睡眠,睡到2秒后醒来
time::sleep_until(time::Instant::now() + time::Duration::from_secs(2)).await;
});
}

注意,std::thread::sleep()会阻塞当前线程,而tokio的睡眠不会阻塞当前线程,实际上tokio的睡眠在进入睡眠后不做任何事,仅仅只是立即放弃CPU,并进入任务轮询队列,等待睡眠时间终点到了之后被唤醒,然后进入就绪队列等待被调度。

注意,tokio的睡眠精度是毫秒,因此无法保证、也不应睡眠更低精度的时间。例如不要睡眠100微秒或100纳秒,这时无法保证睡眠的时长。

下面是一个睡眠10微秒的例子,多次执行,会发现基本上都要1毫秒多,去掉执行指令的时间,实际的睡眠时长大概是1毫秒。另外,将睡眠10微秒改成睡眠100微秒或1纳秒,结果也是接近的。

1
2
3
4
5
6
7
8
9
10
11
12
use tokio::{self, runtime::Runtime, time};

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let start = time::Instant::now();
// time::sleep(time::Duration::from_nanos(100)).await;
// time::sleep(time::Duration::from_micros(100)).await;
time::sleep(time::Duration::from_micros(10)).await;
println!("sleep {}", time::Instant::now().duration_since(start).as_nanos());
});
}

执行的多次,输出结果:

1
2
3
4
5
6
sleep 1174300
sleep 1202900
sleep 1161200
sleep 1393200
sleep 1306400
sleep 1285300

睡眠的最长持续时间为 68719476734 毫秒(大约 2.2 年)。此外,该类型没有实现 Unpin特征,这意味着如果你将它与 select! 一起使用或通过调用 poll 来使用,你必须先固定它。

tokio::time::Sleep有三个方法:

  • deadline():返回Instant,表示该睡眠任务的睡眠终点
  • is_elapsed():可判断此时此刻是否已经超过了该sleep任务的睡眠终点
  • reset():可用于重置睡眠任务。如果睡眠任务未完成,则直接修改睡眠终点,如果睡眠任务已经完成,则再次创建睡眠任务,等待新的终点

需要注意的是,reset()要求修改睡眠终点,因此Sleep实例需要是mut的,但这样会消费掉Sleep实例,更友好的方式是使用tokio::pin!(sleep)sleeppin在当前栈中,这样就可以调用as_mut()方法获取它的可修改版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

#[allow(dead_code)]
fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("start: {}", now());
let slp = time::sleep(time::Duration::from_secs(1));
tokio::pin!(slp);

slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));

slp.await;
println!("end: {}", now());
});
}

输出:

1
2
start: 2021-11-02 21:57:42
end: 2021-11-02 21:57:44

重置已完成的睡眠实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
use chrono::Local;
use tokio::{self, runtime::Runtime, time};

#[allow(dead_code)]
fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("start: {}", now());
let slp = time::sleep(time::Duration::from_secs(1));
tokio::pin!(slp);

//注意调用slp.as_mut().await,而不是slp.await,后者会move消费掉slp
slp.as_mut().await;
println!("end 1: {}", now());

slp.as_mut().reset(time::Instant::now() + time::Duration::from_secs(2));

slp.await;
println!("end 2: {}", now());
});
}

输出结果:

1
2
3
start: 2021-11-02 21:59:25
end 1: 2021-11-02 21:59:26
end 2: 2021-11-02 21:59:28

4 Tokio中的异步通信和同步

tokio提供了异步多任务的并发能力,也提供了异步任务之间的通信方式和同步机制。

要使用tokio的同步功能,需要打开sync功能标志。

4.1 sync模块简介

tokio::sync模块主要包含两部分功能:异步任务之间的消息传递模块以及异步任务之间的状态同步模块。

消息传递

tokio程序中最常见的同步形式是消息传递。两个任务独立运行,相互发送消息进行同步。这样做的好处是可以避免共享状态。

消息传递是使用通道(channel)实现的。通道支持将消息从一个生产者任务发送到一个或多个消费者任务。tokio提供了几种通道。每个通道支持不同的消息传递模式。当一个通道支持多个生产者时,许多单独的任务可能会发送消息。当一个通道支持多个消费者时,许多不同的独立任务可能会接收消息。具体来说,tokio提供了以下几种通道:

  • 单向通道(Oneshot Channel)支持从单个生产者发送一个值到单个消费者。这种通道通常用于将计算结果发送到等待方。
  • 多生产者单消费者通道(mpsc Channel)支持从多个生产者发送多个值到单个消费者。这种通道通常用于将工作发送到任务或接收多个计算的结果。
  • 广播通道(Broadcast Channel)支持从多个生产者发送多个值到多个消费者,每个消费者都会接收到每个值。这种通道可以用于实现发布/订阅或聊天系统中常见的扇出(Fan-out)模式。
  • 观察通道(Watch Channel)支持从单个生产者发送多个值到多个消费者。然而,该通道中只会存储最新的值。当新值被发送时,消费者会收到通知,但不能保证每个消费者都会看到所有的值。

总的来说,不同类型的通道,都有不同的使用场景。

状态同步

剩下的同步原语侧重于同步状态。它们是标准库提供版本的异步等价物。它们的操作方式与标准库的对应项类似,但会以异步方式等待,而不是阻塞线程。

有以下几种基本的同步原语:

  • 屏障(Barrier)确保多个任务在继续执行之前彼此等待程序中的某个点到达。这样多个任务可以同时继续执行。
  • 互斥锁(Mutex)是一种互斥机制,确保最多只有一个线程能够访问某些数据。
  • 通知(Notify)是一种基本的任务通知机制。它支持在不发送数据的情况下通知接收任务,此时任务会唤醒并恢复处理。
  • 读写锁(RwLock)提供了一种互斥机制,允许多个读者同时进行读取操作,而只允许一个写者进行写入操作。在某些情况下,这比互斥锁更高效。
  • 信号量(Semaphore)限制并发数量。信号量持有一定数量的许可证,任务可以请求这些许可证以进入关键部分。信号量适用于实现任何类型的限制或边界控制。

4.2 消息传递通道

一次性通道oneshot

这是一次性通道,用于在异步任务之间发送单个消息。oneshot::channel() 函数用于创建形成通道的 SenderReceiver 句柄对。

Sender 句柄由生产者使用来发送值。消费者使用 Receiver 句柄来接收值。每个句柄都可以用于单独的任务。

由于 send 方法不是异步的,因此它可以在任何地方使用。这包括在两个运行时之间发送,以及从非异步代码中使用它。

下面展示如何创建一个句柄对:

1
2
3
4
// 创建一个发送端tx和接收端rx的句柄对,在发送数据时,会自动推断出通道中的数据类型
let (tx, rx) = oneshot::channel();
// 使用turbofish语法创建一个可发送i32数据的通道
let (tx, rx) = oneshot::channel::<i32>();

创建通道之后,下面来看看如何发送消息。

发送者Sender

前面创建的txSender结构,其提供了发送消息的方法send()

1
pub fn send(self, t: T) -> Result<(), T>

此方法消耗 self ,因为在一次性通道上只能发送一个值。它没有被标记为异步,因为将消息发送到一次性通道不需要任何形式的等待。因此, 该方法可以毫无问题地用在同步和异步代码中。

发送不一定总是成功,如果在发送之前rx已经关闭,则返回Err

因此,发送数据的时候,通常会做如下检测:

1
2
3
if let Err(_) = tx.send(3) {
println!("the receiver dropped");
}

下面是一个发送消息的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
if let Err(_) = tx.send(3) {
println!("the receiver dropped");
}
});

match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("the sender dropped"),
}
}

可以通过is_closed方法判断对端是否已经被drop,如果被删除则返回 true ,此时调用 send 将始终导致错误。

比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
// 创建tx和rx
let (tx, rx) = oneshot::channel();
assert!(!tx.is_closed());
// drop掉接收端
drop(rx);
// 此时发送消息始终为Err
assert!(tx.is_closed());
assert!(tx.send("never received").is_err());
}

发送端可以通过closed方法来等待接收端关闭,基本用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel::<()>();

tokio::spawn(async move {
drop(rx);
});

tx.closed().await;
println!("the receiver dropped");
}

当与 select! 配合使用时,此函数非常有用,可以在接收者不再对结果感兴趣时中止计算。其中一个分支计算要发送的数据,另一个分支为closed()等待分支,如果先计算完成,则发送计算结果,而如果是先等到了对端closed()的异步任务完成,则无需再计算浪费CPU去计算结果。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use tokio::sync::oneshot;
use tokio::time::{self, Duration};

async fn compute() -> String {
// Complex computation returning a `String`
}

#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel();

tokio::spawn(async move {
tokio::select! {
_ = tx.closed() => {
// 先等待到了对端关闭,不做任何事,select!会自动取消其它分支的任务
}
value = compute() => {
// 先计算得到结果,则发送给对端
// 但有可能刚计算完成,尚未发送时,对端刚好关闭,因此可能发送失败
// 此处用 `_` 表示丢弃发送失败的错误
let _ = tx.send(value);
}
}
});

// 等待10s
let _ = time::timeout(Duration::from_secs(10), rx).await;
}
接收者Receiver

Receiver结构用于从关联的 Sender 接收值,该通道没有 recv 方法,因为接收者本身实现了 Future 特征。直接接收 Result<T, error::RecvError >.await Receiver 对象,因此直接通过.await即可接收数据,但是,接收数据并不一定会接收成功,比如再发送方还没有发送之前,该通道就已经关闭,接收方将失败并显示 error::RecvError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel::<u32>();

tokio::spawn(async move {
drop(tx);
});

match rx.await {
Ok(_) => panic!("This doesn't happen"),
Err(_) => println!("the sender dropped"),
}
}

Receiver可以通过close方法关闭自己这一端,当然也可以直接drop,关闭操作是幂等的,即,如果关闭的是已经关闭的Receiver,不会产生任何影响。

调用 close 之后发生的任何 send 操作都肯定会失败。但需要注意,有可能在关闭操作完成之前,对端正好发送了一个数据,此时则应调用 try_recv 来接收值:

1
2
3
4
5
6
7
8
9
10
11
12
13
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, mut rx) = oneshot::channel();

assert!(tx.send("will receive").is_ok());

rx.close();

let msg = rx.try_recv().unwrap();
assert_eq!(msg, "will receive");
}

try_recv()方法返回三种可能值:

  • Ok(T): 表示成功接收到通道中的数据
  • Err(TryRecvError::Empty): 表示通道为空,尚未发送任何值
  • Err(TryRecvError::Closed): 表示发送者在未发送值的情况下丢弃,或者消息已被接收

一个常用的场景为在 tokio::select! 循环中使用 Receiver ,需要在通道前面添加 &mut

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use tokio::sync::oneshot;
use tokio::time::{interval, sleep, Duration};

#[tokio::main]
async fn main() {
let (send, mut recv) = oneshot::channel();
let mut interval = interval(Duration::from_millis(100));

tokio::spawn(async move {
sleep(Duration::from_secs(1)).await;
send.send("shut down").unwrap();
});

loop {
// select!中无需await,因为select!会自动轮询推进每一个分支的任务进度
tokio::select! {
_ = interval.tick() => println!("Another 100ms"),
msg = &mut recv => {
println!("Got message: {}", msg.unwrap());
break;
}
}
}
}

mpsc通道

mpsc 通道支持将多个值从多个生产者发送到单个消费者。此通道通常用于将工作发送到任务或接收许多计算的结果。

如果想从单个生产者向单个消费者发送多条消息,这也是你应该使用的通道。没有专用的spsc频道。

此类通道可以发送多条消息,根据通道容量的不同,可以分为有界通道和无界通道,前者可以存储的消息数量有限制,如果达到此限制,尝试发送另一条消息将等到从通道接收到消息为止;后者具有无限容量,因此 send 方法将始终立即完成,这使得 UnboundedSender 可以在同步和异步代码中使用。

std 提供的 mpsc 通道类似,通道构造函数提供单独的发送和接收句柄, SenderReceiver 用于有界通道, UnboundedSenderUnboundedReceiver 表示无界通道。如果没有消息可读取,则发送新值时将通知当前任务。 SenderUnboundedSender 允许将值发送到通道中。如果有界通道已满,则发送将被拒绝,并且当有额外容量可用时,任务将收到通知。换句话说,通道提供背压。(关于背压,见同步通道

有界通道

通过mpsc::channel()创建有界通道,需传递一个大于1的usize值作为其参数。

例如,创建一个容量为100的通道:

1
2
3
4
use tokio::sync::mpsc;

// 接收端接收数据时需修改状态,因此声明为mut
let (tx, mut rx) = mpsc::channel(100);

该通道将缓冲最多100条数量的消息。一旦缓冲区已满,尝试发送新消息将等待,直到从通道接收到消息。

Sender 上发送的所有数据都将按照发送时的顺序在 Receiver 上可用。Sender 可以从多个代码位置克隆到 send 到同一通道。但是仅支持一个接收端 Receiver

如果在尝试 sendReceiver 断开连接,则 send 方法将返回 SendError 。同样,如果 Sender 在尝试 recv 时断开连接,则 recv 方法将返回 None

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100);

tokio::spawn(async move {
for i in 0..10 {
if let Err(_) = tx.send(i).await {
println!("receiver dropped");
return;
}
}
});

while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
}

上面的示例中,先生成了一个异步任务,该异步任务向通道中发送10个数据, Receiver 端则在循环中不断从通道中取数据。

也可以稍微修改一下,创建10个异步任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(100);
for i in 0..10 {
let tx1 = tx.clone();
tokio::spawn(async move {
if let Err(_) = tx1.send(i).await {
println!("receiver dropped");
return;
}
});
}
// 这里要drop掉tx,否则rx会一直阻塞等待
drop(tx);
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
}

10个异步任务发送消息的顺序是未知的,因此接收到的消息无法保证顺序。另外注意上面示例中的drop(tx),因为生成的10个异步任务中都拥有clone后的Senderclone出的tx1在每个异步任务完成时自动被drop,但原始任务中还有一个Sender,如果不关闭这个Senderrx.recv()将不会返回None,而是一直等待。

下面看一个超过通道容量的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
use chrono::Local;
use tokio::{
self,
runtime::Runtime,
sync,
time::{self, Duration},
};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let (tx, mut rx) = sync::mpsc::channel::<i32>(5);

tokio::spawn(async move {
for i in 1..=7 {
if tx.send(i).await.is_err() {
println!("receiver closed");
return;
}
println!("sended: {}, {}", i, now());
}
});

time::sleep(Duration::from_secs(1)).await;
while let Some(i) = rx.recv().await {
println!("received: {}, {}", i, now());
time::sleep(Duration::from_secs(1)).await;
}
});
}

如果通道已满,Sender通过send()发送消息时将等待。上面示例中,通道容量为5,但要发送7个数据,前5个数据会立即发送,发送第6个消息的时候将等待,直到1秒后Receiver开始从通道中每隔一秒消费一条数据。

下面简单介绍sync::mpsc::Sender提供的方法

  • capacity(): 获取当前通道的剩余容量(注意,不是初始化容量)
  • closed(): 等待Receiver端关闭,当Receiver端关闭后该等待任务会立即完成
  • is_closed(): 判断Receiver端是否已经关闭
  • send(): 向通道中发送消息,通道已满时会等待通道中的空闲位置,如果对端已关闭,则返回错误
  • send_timeout(): 向通道中发送消息,通道已满时只等待指定的时长
  • same_channel(): 如果此Sender与另外一个Sender是否属于同一通道,返回true,否则返回false
  • try_send(): 向通道中发送消息,但不等待,如果发送不成功,则返回错误
  • reserve(): 等待并申请一个通道中的空闲位置,返回一个Permit,申请的空闲位置被占位,且该位置只留给该Permit实例,之后该Permit可以直接向通道中发送消息,并释放其占位的位置。申请成功时,通道空闲容量减1,释放位置时,通道容量会加1
  • try_reserve(): 尝试申请一个空闲位置且不等待,如果无法申请,则返回错误
  • reserve_owned(): 与reserve()类似,它返回拥有所有权的OwnedPermit,但会按值所有权移动Sender
  • try_reserve_owned(): reserve_owned()的不等待版本,尝试申请空闲位置失败时会立即返回错误
  • blocking_send(): Sender可以在同步代码环境中使用该方法向异步环境发送消息

sync::mpsc::Receiver提供的方法

  • close(): 关闭Receiver端,为了保证没有消息被丢弃,在调用 close() 后,必须一直调用 recv() 直到返回 None
  • recv(): 接收消息,如果通道缓冲区中没有消息,但通道尚未关闭,则此方法将等待,直到发送消息或通道关闭,如果对端已全部关闭,则返回None
  • try_recv(): 尝试接收消息,不等待,如果无法接收消息(即通道为空或对端已关闭),则返回错误
  • blocking_recv(): Receiver可以在同步代码环境中使用该方法接收来自异步环境的消息
  • poll_recv(): 轮询以接收此频道中的下一条消息

注意,在这些方法中,try_xxx()方法都是立即返回不等待的(可以认为是同步代码),因此调用它们后无需await,只有调用那些可能需要等待的方法,调用后才需要await。例如rx.recv().awaitrx.try_recv()

Sender端可通过send_timeout()来设置一个等待通道空闲位置的超时时间,它和send()返回值一样,此外还添加一种超时错误:超时后仍然没有发送成功时将返回错误。至于返回的是什么错误,对于发送端来说不重要,重要的是发送的消息是否成功。因此,对于Sender端的条件判断,通常也仅仅只是检测is_err()

1
2
3
if tx.send_timeout(33, Duration::from_secs(1)).await.is_err() {
println!("receiver closed or timeout");
}

需要特别注意的是,Receiver端调用close()方法关闭通道后,只是半关闭状态,Receiver端仍然可以继续读取可能已经缓冲在通道中的消息,close()只能保证Sender端无法再发送普通的消息,但PermitOwnedPermit仍然可以向通道发送消息。只有通道已空且所有Sender端(包括PermitOwnedPermit)都已经关闭的情况下,recv()才会返回None,此时代表通道完全关闭。

与一次性通道类似,try_recv()在无法立即接收消息时会立即返回错误。返回的错误分为两种:

  • Err(TryRecvError::Empty): 表示通道为空,尚未发送任何值,但Sender端(包括PermitOwnedPermit)尚未全部关闭
  • Err(TryRecvError::Disconnected): 表示通道已空,且所有Sender端(包括PermitOwnedPermit)全部已经关闭

关于reserve()reserve_owned(),通过官方示例即可轻松理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// 创建容量为1的通道
let (tx, mut rx) = mpsc::channel(1);
// 申请并占有唯一的空闲位置
let permit = tx.reserve().await.unwrap();
// 唯一的位置已被permit占有,tx.send()无法发送消息
assert!(tx.try_send(123).is_err());
// Permit可以通过send()方法向它占有的那个位置发送消息
permit.send(456);
// Receiver端接收到消息
assert_eq!(rx.recv().await.unwrap(), 456);


// 创建容量为1的通道
let (tx, mut rx) = mpsc::channel(1);
// tx.reserve_owned()会消费掉tx
let permit = tx.reserve_owned().await.unwrap();
// 通过permit.send()发送消息,它又返回一个Sender
let tx = permit.send(456);
assert_eq!(rx.recv().await.unwrap(), 456);
// 可以继续使用返回的Sender发送消息
tx.send(789).await.unwrap();
}
无界通道

mpsc::unbounded_channel提供无界通道,用于在异步任务之间进行通信,它无需背压,因为它可以缓存无限数量的消息,直到内存耗尽为止。

1
2
3
use tokio::sync;

let (tx, mut rx) = sync::mpsc::unbounded_channel();

由于容量无限,因此Sender端可以无需等待(无需await)地不断向通道中发送消息,这也意味着无界通道的Sender既可以在同步环境中也可以在异步环境中向通道中发送消息。只有当Receiver端已经关闭,Sender端的发送才会返回错误。

使用无界通道要关心的问题是如何避免通道积压的数据过多导致内存耗尽,比如添加监控机制,或者保证消费端的速度基本大于接收端的速度,在出现内存告警时,通过某些算法手段进行限速等等。

broadcast通道

这是一种广播通道,本质上是多生产者、多消费者广播队列,但所有消费者都可以看到每个发送的值。

使用mpsc::broadcast()创建广播通道,需要指定一个通道容量作为参数。返回发送端tx和接收端rx

1
2
3
use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);

Sender可以克隆得到多个Sender,可以调用Sendersubscribe()方法来创建新的Receiver

当发送值时,所有 Receiver 都会收到通知并接收该值。该值在通道内存储一次,并根据每个接收器的需要进行克隆。一旦所有接收者都收到了该值的克隆,该值就会从通道中释放。

基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});

tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});

tx.send(10).unwrap();
tx.send(20).unwrap();
}

由于发送的消息必须保留到所有 Receiver 收到克隆为止,因此广播通道容易受到“接收缓慢”问题的影响。在这种情况下,除了一个接收器之外的所有接收器都能够按照发送的速率接收值。但由于一个接收器停止运行,值迟迟无法释放,通道逐渐被填满。

广播通道通过对通道在任何给定时间可以保留的值的数量设置硬上限来处理这种情况。该上限就是容量,作为参数传递给 channel 函数,可以把通道看做一个队列。

如果在通道填满时发送值,并不会阻塞(因此使用send无需await),而是则释放通道当前保存的最旧值(队列头的值)。释放的空间用于插入新值(在队列尾)。任何尚未看到释放值的 Receiver将在下次调用 recv 时返回 RecvError::Lagged 错误。返回 RecvError::Lagged 后,发生滞后 Receiver的位置将更新为当前通道包含的最旧值(队列头部),下一次调用 recv 将返回该值。

此行为使Receiver能够检测到它何时落后以至于数据已被丢弃。调用者可以决定如何对此做出响应:通过中止其任务或通过容忍丢失的消息并恢复使用通道。

下面是处理滞后的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
// 通道容量2
let (tx, mut rx) = broadcast::channel(2);

// 写入3个数据,将出现接收端落后于发送端的情况,
// 此时,第一个数据(10)将被剔除,剔除后,20将位于队列的头部
tx.send(10).unwrap();
tx.send(20).unwrap();
tx.send(30).unwrap();

// 落后于发送端之后的第一次recv()操作,返回RecvError::Lagged错误
assert!(rx.recv().await.is_err());

// 之后可正常获取通道中的数据
assert_eq!(20, rx.recv().await.unwrap());
assert_eq!(30, rx.recv().await.unwrap());
}

Receiver也可以使用try_recv()方法尝试在不等待的情况下返回此接收器上的待处理值,如果Sender都已关闭,则返回TryRecvError::Closed错误,如果接收端已落后,则返回TryRecvError::Lagged错误,如果通道为空,则返回TryRecvError::Empty错误。

可能会出现这样一种现象:ReceiverA已经接收了通道中的第10个消息,但另一个ReceiverB可能尚未接收第一个消息,由于第一个消息还未被全部接收者所克隆,它仍会保留在通道中并占用通道的位置,假如该通道的最大容量为10,此时Sender再发送一个消息,那么第一个数据将被释放,ReceiverB接收到消息的时候将收到RecvError::Lagged错误并永远地错过第一个消息。

watch通道

该通道是只保留最后发送的值的单生产者、多消费者通道。

通道内最多只有一个值,每次Sender发送新值时,都会覆盖旧值。此通道对于监视代码库中多个点的值更改非常有用,例如配置值的更改。

1
2
3
4
5
6
7
8
9
10
11
use tokio::sync::watch;

let (tx, mut rx) = watch::channel("hello");

tokio::spawn(async move {
while rx.changed().await.is_ok() {
println!("received = {:?}", *rx.borrow());
}
});

tx.send("world")?;

Sender端可通过subscribe()创建新的Receiver

当所有Receiver均已关闭时,send()方法将返回错误。因此,send()必须要在有Receiver存活的情况下才能发送数据。但是Sender端还有一个send_replace()方法,它可以在没有Receiver的情况下将数据写入通道,并且该方法会返回通道中原来保存的值。

无论是Sender还是Receiver,都可以通过borrow()方法取得通道中当前的值。由于可以有多个Receiver,为了避免读写时的数据不一致,watch通道内部使用了读写锁:Sender端要发送数据修改通道中的数据时,需要申请写锁,无论是Sender还是Receiver,在调用borrow()或其它一些方式访问通道数据时,都需要申请读锁。因此,访问通道数据时要尽快释放读锁,否则可能会长时间阻塞Sender端的发送操作。

如果Sender未发送数据,或者隔较长时间才发送一次数据,那么通道中的数据在一段时间内将一直保持不变。如果Receiver在这段时间内去多次读取通道,得到的结果将完全相同。但有时候,可能更需要的是等待通道中的数据已经发生变化,然后再根据新的数据做进一步操作,而不是循环不断地去读取并判断当前读取到的值是否和之前读取的旧值相同。

watch通道已经提供了这种功能:Receiver可以标记通道中的数据,记录该数据是否已经被读取过。Receiverchanged()方法用于等待通道中的数据发生变化,其内部判断过程是:如果通道中的数据已经被标记为已读取过,那么changed()将等待数据更新,如果数据未标记过已读取,那么changed()认为当前数据就是新数据,changed()会立即返回。

Receiverborrow()方法不会标记数据已经读取,所以borrow()之后调用的changed()会立即返回。但是changed()等待到新值之后,会立即将该值标记为已读取,使得下次调用changed()时会进行等待。

Receiver还有一个borrow_and_update()方法,它会读取数据并标记数据已经被读取,因此随后调用chagned()将进入等待。

最后需要注意,无论是Sender还是Receiver端,访问数据的时候都会申请读锁,要尽量快地释放读锁,以免Sender长时间无法发送数据。

4.3 状态同步

Barrier屏障

屏障使多个任务能够保持进度同步。比如,同一个任务被多个异步并发执行,但每个异步任务都需要保证其它所有任务都必须完成到进度A,才可以进行下一步。 此时可以在A位置使用屏障,这样可以保证所有任务在开始第二步之前的进度是同步的。

当然,也不一定要等待所有任务的进度都同步,可以设置等待一部分任务的进度同步。也就是说,让并发任务的进度按批次进行同步。第一批的任务进度都同步后,这一批任务将通过屏障,但是该屏障依然会阻挡下一批任务,直到下一批任务的进度都同步之后才放行。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
use std::sync::Arc;
use tokio::sync::Barrier;

#[tokio::main]
async fn main() {
let mut handles = Vec::with_capacity(10);

// 参数10表示屏障宽度为10,只等待10个任务达到屏障点就放行这一批任务
// 也就是说,某时刻已经有9个任务在等待,当第10个任务调用wait的时候,屏障将放行这一批
let barrier = Arc::new(Barrier::new(10));

for _ in 0..10 {
let c = barrier.clone();
handles.push(tokio::spawn(async move {
println!("before wait");

// 在此设置屏障,保证10个任务都已输出before wait才继续向下执行
let wait_result = c.wait().await;
println!("after wait");
wait_result
}));
}

let mut num_leaders = 0;
for handle in handles {
let wait_result = handle.await.unwrap();
if wait_result.is_leader() {
num_leaders += 1;
}
}

assert_eq!(num_leaders, 1);
}

Barrier调用wait()方法时,返回BarrierWaitResult,该结构有一个is_leader()方法,可以用来判断某个任务是否是该批次任务中的第一个任务。每一批通过屏障的任务都只有一个leader,其余非leader任务调用is_leader()都将返回false

使用屏障时,一定要保证可以到达屏障点的并发任务数量是屏障宽度的整数倍,否则多出来的任务将一直等待。例如,将屏障的宽度设置为10(即10个任务一批),但是有15个并发任务,多出来的5个任务无法凑成完整的一批,这5个任务将一直等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use std::sync::Arc;
use tokio::sync::Barrier;
use tokio::{ self, runtime::Runtime, time::{self, Duration} };

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let barrier = Arc::new(Barrier::new(10));

for i in 1..=15 {
let b = barrier.clone();
tokio::spawn(async move {
println!("data before: {}", i);

b.wait().await; // 15个任务中,多出5个任务将一直在此等待
time::sleep(Duration::from_millis(10)).await;
println!("data after: {}", i);
});
}
time::sleep(Duration::from_secs(5)).await;
});
}

在上面的例子中,可以通过屏障的任务只有10个,剩下的5个将永远阻塞,输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
data before: 1
data before: 2
data before: 3
data before: 5
data before: 7
data before: 8
data before: 14
data before: 11
data before: 12
data before: 13
data before: 4
data before: 6
data before: 9
data before: 10
data before: 15
data after: 3
data after: 7
data after: 12
data after: 13
data after: 8
data after: 14
data after: 11
data after: 2
data after: 1
data after: 5

Mutex互斥锁

在之前介绍过多线程版本的互斥锁Mutex,下面来看看tokio版本的互斥锁tokio::sync::Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use std::sync::Arc;
use tokio::{self, sync, runtime::Runtime, time::{self, Duration}};

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let mutex = Arc::new(sync::Mutex::new(0));

for i in 0..10 {
let lock = Arc::clone(&mutex);
tokio::spawn(async move {
let mut data = lock.lock().await;
*data += 1;
println!("task: {}, data: {}", i, data);
});
}

time::sleep(Duration::from_secs(1)).await;
});
}

总体的使用方法类似,使用new()来创建互斥锁,使用lock()来申请锁,申请锁成功时将返回MutexGuard,并通过超出作用域drop的方式来释放锁。

1
2
3
4
5
6
7
8
9
10
task: 0, data: 1
task: 2, data: 2
task: 3, data: 3
task: 6, data: 4
task: 7, data: 5
task: 5, data: 6
task: 8, data: 7
task: 9, data: 8
task: 1, data: 9
task: 4, data: 10

可以看出任务的调度顺序是随机的,但是数据加1的操作是依次完成的。

在tokio中也可以使用标准库中的互斥锁,与普遍的看法相反,在异步代码中使用标准库中的普通 Mutex 是可以的,而且通常是首选。与标准库的互斥锁相比,异步互斥锁提供的功能是能够在 .await 点上保持锁定。这使得异步互斥锁比阻塞互斥锁更加耗费资源,因此在可以使用阻塞互斥锁的情况下应该优先考虑它。异步互斥锁的主要用例是提供对 IO 资源(例如数据库连接)的共享可变访问。如果互斥锁后面的值只是数据,通常适合使用阻塞互斥锁,例如标准库中的互斥锁或 parking_lot(比 Rust 标准中的实现更小、更快、更灵活的阻塞的互斥锁) 。

什么情况下可以选择使用tokio的Mutex?当跨await的时候,可以考虑使用,因为这时使用标准库的Mutex将编译错误。当然,也有相应的解决方案。

什么是跨await?每个await都代表一个异步任务,跨await即表示该异步任务中出现了至少一个子任务。而每个异步任务都可能会被tokio内部窃取到不同的线程上执行,因此跨await时要求其父任务实现Send特征,这是因为子任务中可能会引用父任务中的数据。

例如,下面定义的异步函数中使用了标准库的互斥锁,且有子任务,这会编译错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::{self, runtime::Runtime, time::{self, Duration}};

async fn add_1(mutex: &Mutex<u64>) {
let mut lock = mutex.lock().unwrap();
*lock += 1;

// 子任务,跨await,且引用了父任务中的数据
time::sleep(Duration::from_millis(*lock)).await;
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let mutex = Arc::new(Mutex::new(0));

for i in 0..10 {
let lock = mutex.clone();
tokio::spawn(async move {
add_1(&lock).await;
});
}

time::sleep(Duration::from_secs(1)).await;
});
}

由于标准库std::sync::MutexGuard没有实现Send,因此它并不能在线程间安全地发送,因此父任务async move{}语句块是非Send的,于是编译报错。当然,如果上面的示例中没有子任务sleep().await子任务,则没有问题,因为已经可以明确知道该Mutex所在的任务是在当前线程执行的。

在这种场景下,可以使用tokio提供的互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use std::sync::Arc;
use tokio::{ self, runtime::Runtime, sync::{Mutex, MutexGuard}, time::{self, Duration} };

async fn add_1(mutex: &Mutex<u64>) {
let mut lock = mutex.lock().await;
*lock += 1;
time::sleep(Duration::from_millis(*lock)).await;
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let mutex = Arc::new(Mutex::new(0));
for i in 0..10 {
let lock = mutex.clone();
tokio::spawn(async move {
add_1(&lock).await;
});
}

time::sleep(Duration::from_secs(1)).await;
});
}

前面提到tokio的互斥锁性能相对较差一些,因此可以不使用tokio锁的情况下,尽量不使用它。对于上面的需求,仍然可以继续使用标准库的Mutex,但需要做一些调整,也就是在子任务await之前,把所有未实现Send的数据都drop掉,保证子任务无法引用父任务中的任何非Send数据。

1
2
3
4
5
6
7
8
9
10
use std::sync::{Arc, Mutex, MutexGuard};

async fn add_1(mutex: &Mutex<u64>) {
{
let mut lock = mutex.lock().unwrap();
*lock += 1;
}
// 子任务,跨await,不引用父任务中的数据
time::sleep(Duration::from_millis(10)).await;
}

这种方案的主要思想是让子任务和父任务不要出现不安全的数据交叉。如果可以的话,应尽量隔离子任务和非Send数据所在的任务。上面的例子已经实现了这一点,但更好的方式是将子任务sleep().await从这个函数中移走。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
use tokio::{ self, runtime::Runtime, sync, time::{self, Duration}};

async fn add_1(mutex: &Mutex<u64>) -> u64 {
let mut lock = mutex.lock().unwrap();
*lock += 1;
*lock
} // 申请的互斥锁在此被释放

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let mutex = Arc::new(Mutex::new(0));

for i in 0..100 {
let lock = mutex.clone();
tokio::spawn(async move {
let n = add_1(&lock).await;
time::sleep(Duration::from_millis(n)).await;
});
}

time::sleep(Duration::from_secs(1)).await;
println!("data: {}", mutex.lock().unwrap());
});
}

另外注意,标准库的Mutex存在毒锁问题。所谓毒锁,即某个持有互斥锁的线程panic了,那么这个锁有可能永远得不到释放(除非线程panic之前已经释放),也称为被污染的锁。毒锁问题可能很严重,因为出现毒锁有可能意味着数据将从此开始不再准确,所以多数时候是直接让毒锁的panic向上传播或单独处理。但出现毒锁并不总是危险的,所以标准库也提供了对应的方案。

但tokio的互斥锁不存在毒锁问题,在持有tokio的Mutex的线程panic时,tokio的做法是直接释放锁。

RwLock读写锁

在之前介绍过多线程版本的读写锁 RwLock,下面介绍tokio版本的读写锁tokio::sync::RwLock

这种类型的锁允许在任何时间点有多个读取者或最多一个写入者。下面是官方文档中的一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
use tokio::sync::RwLock;

#[tokio::main]
async fn main() {
let lock = RwLock::new(5);

// 多个读锁共存
{
// read()返回RwLockReadGuard
let r1 = lock.read().await;
let r2 = lock.read().await;
assert_eq!(*r1, 5); // 对Guard解引用,即可得到其内部的值
assert_eq!(*r2, 5);
} // 读锁(r1, r2)在此释放

// 只允许一个写锁存在
{
// write()返回RwLockWriteGuard
let mut w = lock.write().await;
*w += 1;
assert_eq!(*w, 6);
} // 写锁(w)被释放
}

读写锁有几种不同的设计方式:

  • 读优先:只要有读操作申请锁,优先将锁分配给读操作。这种方式可以提供非常好的并发能力,但是大量的读操作可能会长时间阻挡写操作(饿死写者)
  • 写优先:只要有写操作申请锁,优先将锁分配给写操作。这种方式可以保证写操作不会被饿死,但会严重影响并发能力

与标准库的读写锁区别在于,tokio的读写锁的优先级策略是公平的(或写优先),以确保读者不会饿死写者。等待锁的任务采用先进先出队列,保证公平性;如果希望获取写锁的任务位于队列的头部,则在释放写锁之前不会发出读锁。这与标准库的 std::sync::RwLock 形成对比,其中优先级策略取决于操作系统的实现。

具体规则如下:

  1. 每次申请锁时都将等待,申请锁的异步任务被切换,CPU交还给调度器
  2. 如果申请的是读锁,并且此时没有写锁存在,则申请成功,对应的任务被唤醒
  3. 如果申请的是读锁,但此时有写锁(包括写锁申请)的存在,那么将等待所有的写锁释放(因为写锁总是优先)
  4. 如果申请的是写锁,如果此时没有读锁的存在,则申请成功
  5. 如果申请的是写锁,但此时有读锁的存在,那么将等待当前正在持有的读锁释放

tokio的写优先会很容易产生死锁。例如,下面的代码会产生死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use std::sync::Arc;
use tokio::{self, runtime::Runtime, sync::RwLock, time::{self, Duration}};

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let lock = Arc::new(RwLock::new(0));

let lock1 = lock.clone();
tokio::spawn(async move {
let n = lock1.read().await;

time::sleep(Duration::from_secs(2)).await;
let nn = lock1.read().await;
});

time::sleep(Duration::from_secs(1)).await;
let mut wn = lock.write().await;
*wn = 2;
});
}

上面示例中,按照时间的流程,首先会在子任务中申请读锁,1秒后在当前任务中申请写锁,再1秒后子任务申请读锁。

申请第一把读锁时,因为此时无锁,所以读锁n申请成功。1秒后申请写锁时,由于此时读锁n尚未释放,因此写锁申请失败,将等待。再1秒之后,继续在子任务中申请读锁nn,但是此时有写锁申请存在,因此第二次申请读锁将等待,于是读锁写锁互相等待,死锁出现。

通过这个例子可以看出,当要使用写锁时,如果要避免死锁,一定要保证同一个任务中的任意两次锁申请之间,前面已经无锁,并且写锁尽早释放。

对于上面的示例,同一个子任务中申请两次读锁,但是第二次申请读锁时,第一把读锁仍未释放,这就产生了死锁的可能。只需在第二次申请读锁前,将第一把读锁释放即可。更保险一点,在写锁写完数据后也手动释放写锁(上面的示例中写完就退出,写锁会自动释放,因此无需手动释放)。

通过上述方式来避免死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
use std::sync::Arc;
use tokio::{self, runtime::Runtime, sync::RwLock, time::{self, Duration}};

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let lock = Arc::new(RwLock::new(0));

let lock1 = lock.clone();
tokio::spawn(async move {
let n = lock1.read().await;
drop(n); // 在申请第二把读锁前,先释放第一把读锁

time::sleep(Duration::from_secs(2)).await;
let nn = lock1.read().await;
drop(nn);
});

time::sleep(Duration::from_secs(1)).await;
let mut wn = lock.write().await;
*wn = 2;
drop(wn);
});
}

tokio还提供了一系列读写锁的方法,见官方文档

Semaphore信号量

在之前介绍过多线程时,已经介绍过tokio的信号量 Semaphore了,这里做个回顾。

使用信号量时,需在初始化时指定数量,每当任务要执行时,从中取走一个信号量,当任务完成时会归还信号量。当某个任务要执行时,如果此时信号量数量为0,则该任务将等待,直到有信号量被归还。因此,信号量通常用来提供类似于限量的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use chrono::Local;
use std::sync::Arc;
use tokio::{ self, runtime::Runtime, sync::Semaphore, time::{self, Duration}};

fn now() -> String {
Local::now().format("%F %T").to_string()
}

fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
// 3个信号量
let semaphore = Arc::new(Semaphore::new(3));

// 5个并发任务,每个任务执行前都先获取信号
// 同一时刻最多只有3个任务进行并发
for i in 1..=5 {
let semaphore = semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
println!("{}, {}", i, now());
time::sleep(Duration::from_secs(1)).await;
});
}

time::sleep(Duration::from_secs(3)).await;
});
}

tokio::sync::Semaphore提供了以下的方法:

  • new(): 创建固定数量的信号量
  • close(): 关闭信号量,关闭信号量时,将唤醒所有等待信号量的等待者
  • is_closed(): 检查信号量是否已经被关闭
  • acquire(): 获取一个信号量许可,如果信号量已经被关闭,则返回错误AcquireError
  • acquire_many(): 获取指定数量的信号量许可,如果信号量数量不够则等待,如果信号量已经被关闭,则返回AcquireError
  • add_permits(): 向当前信号量中额外添加N个信号量
  • available_permits(): 当前信号量中剩余的信号量数量
  • try_acquire(): 不等待地尝试获取一个信号量,如果信号量已经关闭,则返回TryAcquireError::Closed,如果目前信号量数量为0,则返回TryAcquireError::NoPermits
  • try_acquire_many(): 不等待地尝试获取指定数量的信号量
  • acquire_owned(): 获取一个信号量并消费掉信号量
  • acquire_many_owned(): 获取指定数量的信号量并消费掉信号量
  • try_acquire_owned(): 不等待地尝试获取信号量并消费掉信号量
  • try_acquire_many_owned(): 不等待地尝试获取指定数量的信号量并消费掉信号量

获取到的信号量许可类型是SemaphorePermit,它有一个forget()方法,该方法可以将信号量不归还给信号量,因此信号量中的信号量将永久性地减少(当然,可使用add_permits()添加)。

更多示例和细节,见官方文档

Notify通知

它提供了一种简单的通知唤醒功能。先看官方示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

let handle = tokio::spawn(async move {
notify2.notified().await;
println!("received notification");
});

println!("sending notification");
notify.notify_one();

// Wait for task to receive notification.
handle.await.unwrap();
}

Notify::new()创建Notify实例,Notify 可以被认为是从0个许可开始的信号量 。每当调用notified().await时,将判断此时是否有许可,如果有,则可直接执行,否则将进入等待。因此,初始化之后立即调用notified().await将会等待。

每当调用notify_one()时,将产生一个许可,多次调用也最多只有一个许可。因此,调用notify_one()之后再调用notified().await则无需等待。

如果同时有多个等待许可的等候者,释放一个许可,在其它环境中可能会产生惊群现象,即大量等候者被一次性同时唤醒去争抢一个资源,抢到的可以继续执行,而未抢到的等候者又重新被阻塞。好在,tokio的通知没有这种问题,tokio使用队列方式让等候者进行排队,先等待的总是先获取到许可,因此不会一次性唤醒所有等候者,而是只唤醒队列头部的那个等候者。

Notify还有一个notify_waiters()方法,它不会释放许可,但是它会一次性唤醒所有正在等待的等候者。严格来说,是让当前已经注册的等候者(即已经调用notified(),但是还未await)在下次等待的时候,可以直接通过,见下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

// 注册两个等候者
let notified1 = notify.notified();
let notified2 = notify.notified();

let handle = tokio::spawn(async move {
println!("sending notifications");
notify2.notify_waiters();
});

// 两个等候者的await都会直接通过
notified1.await;
notified2.await;
println!("received notifications");
}