这篇内容主要是对底层探秘: Future 执行器与任务调度教程的一些个人理解记录。
Async 简介
现代的编程语言一般都对并发提供了支持,每种语言可能提供不同的并发模型,包括多线程、事件驱动、协程、async/await 等。在 Rust 中同时提供了多线程编程和 async 编程。
Rust 中提供的 async 模型,相比较其他语言还是有些区别:
- Future 在 Rust 中是惰性的,只有在被轮询(poll)时才会运行,因此丢弃一个 future 会阻止它未来再被运行,可以将Future理解为一个在未来某个时间点被调度执行的任务;
- Async 在 Rust 中使用开销是零, 意味着只有你能看到的代码(自己的代码)才有性能损耗,你看不到的(async 内部实现)都没有性能损耗;
- Rust 没有内置异步调用所必需的运行时,但是无需担心,Rust 社区生态中已经提供了非常优异的运行时实现,如 tokio;
- 运行时同时支持单线程和多线程。
事实上, async 底层也是基于线程实现,但是它基于线程封装了一个运行时,可以将多个任务映射到少量线程上,然后将线程切换变成了任务切换,后者仅仅是内存中的访问,因此要高效的多。
Future 惰性简单理解
对“Future 在 Rust 中是惰性的”我们通过下面的程序来理解下。
#[test]
fn future_test() {
f1();
}
pub async fn f1() {
println!("f1");
f2();
}
pub async fn f2() {
println!("f2");
}
running 1 test
test tmp::only_test::future_test ... ok
successes:
successes:
tmp::only_test::future_test
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
我们运行上面的程序,会发现不会打印任何信息。同时编译器会有这样的提示
unused implementer of
futures::Future
that must be used futures do nothing unless you.await
or poll them#[warn(unused_must_use)]
on by default。
这就是上面所说“Future 在 Rust 中是惰性的”。如果没有使用 .await 或者其他方式去推它一把,它不会自己主动执行。
想要 Future 执行,需要使用 .wait 或者使用 futures 中的 block_on 函数。让我们改写下代码
#[test]
fn future_test() {
block_on(f1());
}
pub async fn f1() {
println!("f1");
f2().await;
}
pub async fn f2() {
println!("f2");
}
running 1 test
f1
f2
test tmp::only_test::future_test ... ok
successes:
successes:
tmp::only_test::future_test
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
可以看到程序如我们期望,进行了信息打印。
那么 Future 到底是如何被执行的呢?这个就是我们这篇内容需要去了解的。
Future 执行过程
首先放一张整体的执行流程大图,这个是对文章中代码执行流程的梳理。不同于书中的分析过程,我试着从自己的视角来串一下执行流程,希望能说的更清晰一点。这里是先沿着黑线再到蓝线来进行串联。
Spawner
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
// 方法用于接收 Future , 然后将它放入任务通道中
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
首先是 Spawner,顾名思义,Spawner 就是生产任务的。Spawner 其实很简单,只有一个 spawn 方法,接收 Future 类型的传入参数 ,将传入的 Future 包装成一个 Task 对象,然后放入到 Channel 中。
Executor
/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
// 执行器将从通道中获取任务,然后进行 poll 执行:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
// Future还没执行完,因此将它放回任务中,等待下次被poll
*future_slot = Some(future);
}
}
}
}
}
Executor 的 run 方法一直循环从 Channel 中拉取 Task。在获取到 Task 之后,将使用其引用创建一个 Waker 对象,这个对象很重要,是后面流程串联的重要内容。
接着使用 Waker 对象创建一个 Context 对象,然后将这个对象作为参数调用 future.poll() 方法。如果 poll() 的结果是 Pending,那么会将 Future 对象继续放入 Task 中,等待下次执行。否则,这个 Task 的执行流程到此结束。
Future
SharedState
/// 在Future和等待的线程间共享状态
struct SharedState {
/// 定时(睡眠)是否结束
completed: bool,
/// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
waker: Option<Waker>,
}
SharedState 并没有什么特别的逻辑,只有两个简单的属性,作为辅助 Future 的状态容器而已。
Future#poll
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 通过检查共享状态,来确定定时器是否已经完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
终于到了今天的主角:Future,首先看 Future 类中的 poll() 方法。是不跟预想中的不一样?这个方法可以说,实在是太简单了。只是根据持有的 SharedState 对象判断了下是否完成而已,再没有任何其他逻辑。在状态为未完成的时候,会将 poll() 方法的入参 Context 对象中持有的 Waker 对象赋值给 SharedState,这个可以算作是唯一的逻辑。
在我看来这个才是 Future 调度器设计的精巧之处,因为只有这样极其简单的逻辑,才能支撑起 Executor 的高效执行。作为一个高效的 Executor,每时每刻都要接收海量的 Task 输入,如果这个 poll() 执行比较耗时,势必降低 Task 的调度效率,造成积压。
Future#new
impl TimerFuture {
/// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 创建新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
// 睡眠指定时间实现计时功能
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
再看 TimerFuture 的 new() 方法,这个方法是 TimerFuture 的实例化方法。这里首先新建了一个 SharedState 对象,然后新建了一个线程。新建的线程 sleep 了指定时间,主要是为了模拟了 Future 中的耗时操作。在耗时操作结束后,将 SharedState 对象改成已完成状态,标记这个 Future 已经完成。因为 Future 已经执行完成,所需要调用 Waker 对象的 wake() 方法,来唤醒任务继续执行。
Task
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// 进行中的Future,在未来的某个时间点会被完成
///
/// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
///
/// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 可以将该任务自身放回到任务通道中,等待执行器的poll
task_sender: SyncSender<Arc<Task>>,
}
// 在执行器 poll 一个 Future 之前,首先需要调用 wake 方法进行唤醒,然后再由 Waker 负责调度该任务并将其放入任务通道中
// 当任务实现了 ArcWake 特征后,它就变成了 Waker ,在调用 wake() 对其唤醒后会将任务复制一份所有权( Arc ),然后将其发送到任务通道中
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("任务队列已满");
}
}
Task 类可以看做是对 Future 的简单包装。需要注意的是 Task 实现了 ArcWake 特征,意味着 Task 也是一个 Waker。看下 wake_by_ref 方法,只是简单的把自己又放入 Channel 中而已。不过,这就够了,有了这个操作就可以实现流程的完整闭环。因为我们前面已经知道,有 Executor 负责不断的从 Channel 中拉取任务来执行。
测试运行
fn new_executor_and_spawner() -> (Executor, Spawner) {
// 任务通道允许的最大缓冲数(任务队列的最大长度)
// 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = mpsc::sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
#[test]
fn future_test() {
let (executor, spawner) = new_executor_and_spawner();
// 生成一个任务
spawner.spawn(async {
println!("{} howdy!", Local::now().format(LONG_DATE_PATTERN));
// 创建定时器Future,并等待它完成
TimerFuture::new(Duration::new(10, 0)).await;
println!("{} done!", Local::now().format(LONG_DATE_PATTERN));
});
// drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
drop(spawner);
// 运行执行器直到任务队列为空
// 任务运行后,会先打印`howdy!`, 暂停10秒,接着打印 `done!`
executor.run();
println!("{} executor over!", Local::now().format(LONG_DATE_PATTERN));
// let b = async {};
}
running 1 test
2024-05-29 10:11:48 howdy!
2024-05-29 10:11:58 done!
2024-05-29 10:11:58 executor over!
test tmp::future_test::future_test ... ok
successes:
successes:
tmp::future_test::future_test
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 1 filtered out; finished in 10.00s
最后,我们来测试下这个调度器的运行情况,可以看到执行的效果也完全符合预期。
到这里大家应该就可以理清整个流程了吧,实际的 Future 调度过程应该比这个复杂很多,但是整体逻辑估计不会差很多。整个过程的设计还是非常精巧,非常值得我们去学习。