Me as a method
用直觉与逻辑寻求好解释,用好解释丰富直觉与逻辑


深入 Rust - 异步编程内幕

Posted on

函数

让我们从函数说起。

一个普通函数,我们调用它的时候,它会执行完成并返回最终结果。

一个异步函数,我们调用它的时候,或许它会执行完成返回最终结果: Ready(result),又或许它没能执行完成,返回一个需要等待的"标志": Pending

为了使得异步函数执行完成,我们需要(隔一段时间就)反复调用它,直到拿到最终结果。

问题是,为什么会出现这种函数?

答案是: Non-Blocking I/O

Non-Blocking I/O

程序调用链的尽头是系统调用。

系统调用默认是阻塞 (Blocking) 的,也就是程序卡在那里等内核返回。

正常情况下阻塞调用不会有太大问题,但对网络 I/O 操作而言,因为涉及到和外部系统交互且传输时延大,这种阻塞调用会导致 CPU 闲置。

于是可以采用非阻塞 (Non-Blocking) 的方式,这时候 I/O 操作可能拿到结果,也可能返回错误(未 ready),但不会卡住。

如果 I/O 操作返回了"未 Ready"。那么程序可以继续去干一些别的计算,过一段时间再来尝试 I/O 操作。

这个 Non-Blocking I/O 的调用,就是异步函数的原型。

异步函数 Future

接口 (interface)

一个最底层的函数往往是因为要进行 Non-blocking IO 而被 "传染" 成异步函数。 它的函数接口可以写成如下的样子:

fn poll(...) -> Poll;

enum Poll<T> {
    Ready(T),
    Pending,
}

把它命名为 poll, 是因为这个函数可以被反复调用(轮训),直到 Poll::Ready

Rust 的语境下,异步函数被命名为 Future: 这个 poll 函数被设计成一个 Future Trait。文档参考

组合 (composition)

如上文所说,一个最底层函数因为 Non-blocking I/O 而被 "传染" 成使用异步接口。

这种传染还会继续发生,调用异步函数的函数,只能是异步函数。因为向下调用的异步函数可能会返回 Pending,这时候只能向上返回 Pending。

我们考虑一个普通函数组合 (字母代表不同的函数调用,函数 A 的结果是由函数 B 和 函数 C 的结果相加而得):

A = B + C

如果 B 和 C 双方或者任意一方是异步函数,那么只能改成:

A.poll = B.poll + C.poll  // B 和 C 都是异步函数
A.poll = B.poll + C  // 只有 B 是异步函数

如果我们把这里的 .poll 换一个更恰当的名字: .await,则有:

A.await = B.await + C.await

为了区别普通函数和异步函数,我们给异步函数在定义时使用 async 关键字,调用时使用 await 关键字,则有:

async A = B.await + C.await

这就是 async/await 的原型了。

总结

  1. Future Trait: 最底层通过实现一个 poll 方法,提供基本的 future (叶子 Future);
  2. Async/Await: 上层通过层层组合 future,产生新的 future。

Async Runtime

上文提到 Future.await 会返回 Pending。返回 Pending 是为了不卡住。而不卡住是为了可以执行别的 Future,最大化利用 CPU。

也就是说这里出现了一个 Future 调度执行的需求,一旦某个 Future.await 返回了 Pending, 那么就把它挂起,改为执行别的 Future,等合适的时候,再回头执行之前挂起的 Future。

我们管这个调度执行 Futures 的组件叫 Async Runtime

Async Runtime 反复切换 Future,反复执行 Future, 直到 Future 拿到最终结果。

Runtime 设计思路

Async Runtime 的主要功能是调度执行 Futures。于是调度策略变得非常重要。

调度策略主要要解决如下问题:

上文提到,Future 在执行的时候会返回 Pending 而被挂起。那么,这些挂起的 Futures 什么时候应该被再次执行?

如果简单地轮训盲试,有效性可能会很差,白白浪费 CPU 资源。

最好的方式,当然是想办法告诉 Runtime 某个 Future ready 了,应该被再次 poll。

这个办法是有的,为了区别起见,我们把 Async Runtime 分成两个部分:

  1. Executor: 负责调度执行 Futures
  2. Reactor: 负责通知 Executor 某个 Future ready 了

Reactor

Reactor 的责任是通知 Executor 某个 Future ready 了,那么 Reactor 怎么知道 Future ready 的?

答案是:操作系统告诉它的。

不严谨地说,Reactor 的原型是 I/O Multiplexing

所谓 I/O Multiplexing 就是用一个专用的线程,去阻塞等待多个 I/O sources (比如 socket)。

当有 I/O sources 可以进行 I/O 操作时,通知和这个 I/O source 有关的 handler 进行 I/O 操作 (Demultiplexing)。

不同的操作系统提供了不同的系统调用来做这件事,现在一般使用的是 epoll/kqueue/IOCP。 (可参考 Mio 的实现)

上文提到,Future 的最底层操作往往是 Non-blocking I/O,I/O 操作意味着有一个 I/O source (比如 socket), 这个 I/O source 可以通过 I/O Multiplexing 让另一个线程去监听,当它 ready 时,就可以通知 Executor 唤醒和它有关联的 Futures。

换言之,程序的尽头是一堆 I/O sources,每个 I/O source 其实是和一组 Futures 所关联的。我们通过 Multiplexing 的方式监听 I/O sources 的状态,就可以准确地唤醒和它关联的 Futures。

Runtime 架构

future-runtime

  1. Executor 调用 future.poll, 并传入一个 callback(注:这个 callback 也叫 Waker,是通过 Future 层层传递最终注册到 Reactor 让它唤醒 Future 的)

    1. Future(最底层的叶子 Future) 使用 non-blocking 模式对 I/O source 操作,直到它返回 err 表示无法继续。
    2. 把这个 I/O source 和 callback 注册到 Reactor 进行监听 (以便知道什么时候可以继续)。
  2. Reactor 通过操作系统集中监听多个 I/O sources。

  3. I/O 事件到达 I/O source

  4. Reactor 调用 callback(Waker) 唤醒对应 Future。

结语

Rust 异步是基于 Future Trait 标准的生态系统。具体实现是多元化的,比如有不同的异步运行时实现。

这里希望提供一些启发性的理解角度,具体的细节请参考其他文档。