深入 Rust - 异步编程内幕
Posted on
函数
让我们从函数说起。
一个普通函数,我们调用它的时候,它会执行完成并返回最终结果。
一个异步函数,我们调用它的时候,或许它会执行完成返回最终结果: Ready(result)
,又或许它没能执行完成,返回一个需要等待的"标志": Pending
。
为了使得异步函数执行完成,我们需要(隔一段时间就)反复调用它,直到拿到最终结果。
问题是,为什么会出现这种函数?
答案是: Non-Blocking I/O
Non-Blocking I/O
程序调用链的尽头是系统调用。
系统调用默认是阻塞 (Blocking) 的,也就是程序卡在那里等内核返回。
正常情况下阻塞调用不会有太大问题,但对网络 I/O 操作而言,因为涉及到和外部系统交互且传输时延大,这种阻塞调用会导致 CPU 闲置。
于是可以采用非阻塞 (Non-Blocking) 的方式,这时候 I/O 操作可能拿到结果,也可能返回错误(未 ready),但不会卡住。
如果 I/O 操作返回了"未 Ready"。那么程序可以继续去干一些别的计算,过一段时间再来尝试 I/O 操作。
这个 Non-Blocking I/O 的调用,就是异步函数的原型。
异步函数 Future
接口 (interface)
一个最底层的函数往往是因为要进行 Non-blocking IO 而被 "传染" 成异步函数。 它的函数接口可以写成如下的样子:
fn poll(...) -> Poll;
enum Poll<T> {
Ready(T),
Pending,
}
把它命名为 poll, 是因为这个函数可以被反复调用(轮训),直到 Poll::Ready
。
Rust 的语境下,异步函数被命名为 Future
: 这个 poll 函数被设计成一个 Future
Trait。文档参考
组合 (composition)
如上文所说,一个最底层函数因为 Non-blocking I/O 而被 "传染" 成使用异步接口。
这种传染还会继续发生,调用异步函数的函数,只能是异步函数。因为向下调用的异步函数可能会返回 Pending,这时候只能向上返回 Pending。
我们考虑一个普通函数组合 (字母代表不同的函数调用,函数 A 的结果是由函数 B 和 函数 C 的结果相加而得):
A = B + C
如果 B 和 C 双方或者任意一方是异步函数,那么只能改成:
A.poll = B.poll + C.poll // B 和 C 都是异步函数
或
A.poll = B.poll + C // 只有 B 是异步函数
如果我们把这里的 .poll
换一个更恰当的名字: .await
,则有:
A.await = B.await + C.await
为了区别普通函数和异步函数,我们给异步函数在定义时使用 async
关键字,调用时使用 await
关键字,则有:
async A = B.await + C.await
这就是 async/await
的原型了。
总结
- Future Trait: 最底层通过实现一个 poll 方法,提供基本的 future (叶子 Future);
- Async/Await: 上层通过层层组合 future,产生新的 future。
Async Runtime
上文提到 Future.await
会返回 Pending
。返回 Pending
是为了不卡住。而不卡住是为了可以执行别的 Future,最大化利用 CPU。
也就是说这里出现了一个 Future 调度执行的需求,一旦某个 Future.await
返回了 Pending
, 那么就把它挂起,改为执行别的 Future,等合适的时候,再回头执行之前挂起的 Future。
我们管这个调度执行 Futures 的组件叫 Async Runtime
。
Async Runtime
反复切换 Future,反复执行 Future, 直到 Future 拿到最终结果。
Runtime 设计思路
Async Runtime 的主要功能是调度执行 Futures。于是调度策略变得非常重要。
调度策略主要要解决如下问题:
上文提到,Future 在执行的时候会返回 Pending 而被挂起。那么,这些挂起的 Futures 什么时候应该被再次执行?
如果简单地轮训盲试,有效性可能会很差,白白浪费 CPU 资源。
最好的方式,当然是想办法告诉 Runtime 某个 Future ready 了,应该被再次 poll。
这个办法是有的,为了区别起见,我们把 Async Runtime 分成两个部分:
- Executor: 负责调度执行 Futures
- Reactor: 负责通知 Executor 某个 Future ready 了
Reactor
Reactor 的责任是通知 Executor 某个 Future ready 了,那么 Reactor 怎么知道 Future ready 的?
答案是:操作系统告诉它的。
不严谨地说,Reactor 的原型是 I/O Multiplexing。
所谓 I/O Multiplexing 就是用一个专用的线程,去阻塞等待多个 I/O sources (比如 socket)。
当有 I/O sources 可以进行 I/O 操作时,通知和这个 I/O source 有关的 handler 进行 I/O 操作 (Demultiplexing)。
不同的操作系统提供了不同的系统调用来做这件事,现在一般使用的是 epoll/kqueue/IOCP。 (可参考 Mio 的实现)
上文提到,Future 的最底层操作往往是 Non-blocking I/O,I/O 操作意味着有一个 I/O source (比如 socket), 这个 I/O source 可以通过 I/O Multiplexing 让另一个线程去监听,当它 ready 时,就可以通知 Executor 唤醒和它有关联的 Futures。
换言之,程序的尽头是一堆 I/O sources,每个 I/O source 其实是和一组 Futures 所关联的。我们通过 Multiplexing 的方式监听 I/O sources 的状态,就可以准确地唤醒和它关联的 Futures。
Runtime 架构
-
Executor 调用 future.poll, 并传入一个 callback(注:这个 callback 也叫 Waker,是通过 Future 层层传递最终注册到 Reactor 让它唤醒 Future 的)
-
- Future(最底层的叶子 Future) 使用 non-blocking 模式对 I/O source 操作,直到它返回 err 表示无法继续。
- 把这个 I/O source 和 callback 注册到 Reactor 进行监听 (以便知道什么时候可以继续)。
-
Reactor 通过操作系统集中监听多个 I/O sources。
-
I/O 事件到达 I/O source
-
Reactor 调用 callback(Waker) 唤醒对应 Future。
结语
Rust 异步是基于 Future Trait 标准的生态系统。具体实现是多元化的,比如有不同的异步运行时实现。
这里希望提供一些启发性的理解角度,具体的细节请参考其他文档。