介绍
在前面的文章中,我们涵盖了Future的基本操作,包括集联,执行,以及创建。但是到目前为止,我们对Future的了解并不能帮助我们解决一些实际的问题。在part3中,我们使用简单粗暴的方法实现了Future的parking以及即时unparking,这个小技巧可以帮助我们利用Future处理一些流程处理的事情,但是仍然离真实的场景相距甚远。在本文中,我们将实现一个更加贴合实际的例子。
A timer future
在part3中,我们创建了一个最简单的Timer Future,我们依然从这个例子着手,但是这一次我们不会立即unparking这个Future的task,而是持续保持Parked状态,直到其达到了完成状态。实现这个功能最简单的做法是引入另一个线程,这个线程将持续等待,直到一段时间之后unpark我们之前设置的任务。
这种机制类似于模拟了一个异步IO的过程,在异步事情完成时,会收到相应的通知。为了简单起见,我们仅创建一个单线程的reactor,在等待期间我们可以执行其他的事情。
Timer revised
我们定义一个简单的结构体,仅包含超时时间以及当前的运行状态。
pub struct WaitInAnotherThread {
end_time: DateTime<Utc>,
running: bool,
}
impl WaitInAnotherThread {
pub fn new(how_long: Duration) -> WaitInAnotherThread {
WaitInAnotherThread {
end_time: Utc::now() + how_long,
running: false,
}
}
}
在上面的定义中,DateTime与Duration这两个类型定义来自于chronos crate。
Spin wait
我们实现的等待函数如下:
pub fn wait_spin(&self) {
while Utc::now() < self.end_time {}
println!("the time has come == {:?}!", self.end_time);
}
这个函数仅仅简单的比较当前时间与超时时间,这种方法有效,并且检测方式非常精确,缺点是过度消耗cpu资源。
fn main() {
let wiat = WaitInAnotherThread::new(Duration::seconds(30));
println!("wait spin started");
wiat.wait_spin();
println!("wait spin completed");
}
从图中可以看出core 8的利用率是100%,这与part3中的例子是一致的。
spin-wait这种方式非常精确但是极度浪费,应该仅用在等待时间非常短,或者是没有选择的情况下使用。
Sleep wait
通常OS提供sleep函数来park用户线程,睡眠X秒的意思实际上是告诉cpu在这段时间内,不要调度当前的线程。在sleep期间,如果cpu还有足够的资源,可以执行其他的任务。在Rust中,通过std::thread::sleep()
来实现线程睡眠。
我们基于sleep改造前面的等待函数:
pub fn wait_blocking(&self) {
while Utc::now() < self.end_time {
let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();
if delta_sec > 0 {
thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
}
}
println!("the time has come == {:?}!", self.end_time);
}
在上面的函数中,我们通过减去已经睡眠的时间来得到尚需继续睡眠的时间,由于timestamp
函数精度不高,整体准确度将低于spin-wait方法。调用方法如下:
let wiat = WaitInAnotherThread::new(Duration::seconds(30));
println!("wait blocking started");
wiat.wait_blocking();
println!("wait blocking completed");
修改之后的性能取得了极大的改善,cpu消耗如下:
改善了性能,但是我们怎样在Future中使用呢。
Future
我们先实现一个简单的Future:
impl Future for WaitInAnotherThread {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while Utc::now() < self.end_time {
let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();
if delta_sec > 0 {
thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
}
}
println!("the time has come == {:?}!", self.end_time);
Ok(Async::Ready(())
}
在poll函数中,通过sleep来睡眠等待,并不消耗cpu,但是这种方法将会阻塞reactor,将会影响到其他的Future。这种写法是不提倡的。
在Rust中,Future应尽量避免使用阻塞函数。
一个好的reactor使用习惯应该遵循以下原则:
1 当需要等待超时事件时,应该停止当前任务
2 不要阻塞当前线程
3 异步事件完成时通知reactor。
我们将在另一个线程中引入sleep函数来满足以上原则。这个睡眠函数不消耗资源,由于运行在看另一个线程中,所以不受当前线程的影响。当sleep线程完成时,将会唤醒当前任务,同时通知reactor。
我们一步步来实现上述想法。
impl Future for WaitInAnotherThread {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if Utc::now() < self.end_time {
println!("not ready yet! parking the task.");
if !self.running {
println!("side thread not running! starting now!");
self.run(task::current());
self.running = true;
}
Ok(Async::NotReady)
} else {
println!("ready! the task will complete.");
Ok(Async::Ready(()))
}
}
}
我们仅需要启动一次睡眠线程,所以使用一个running
变量来标记。有一点需要切记,在Future的poll函数被调用之前,我们的任务并不会被执行,这一点与我们的需求是一致的。此外,在启动睡眠线程之前,我们还检查了超时时间是否已过,如果已经超时,将不会创建睡眠线程。
如果未超时,且睡眠线程创建完成后,我们向reactor申请park我们的任务,通过返回Ok(Async::NotReady)
完成。与part3相反,在这里我们并不会立即unpark,这个工作将由睡眠线程来完成。在其他的实现中,比如IO,这样的park行为可能由OS来完成。
睡眠线程的实现如下:
fn run(&mut self, task: task::Task) {
let lend = self.end_time;
thread::spawn(move || {
while Utc::now() < lend {
let delta_sec = lend.timestamp() - Utc::now().timestamp();
if delta_sec > 0 {
thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));
}
task.notify();
}
println!("the time has come == {:?}!", lend);
});
}
有两点值得注意。首先,我们将task
传递给了睡眠线程,原因是我们我们不能在另外的线程中访问到Task::current()
。其次,我们不能将self移动到闭包中,所以我们需要转移所有权至lend变量。为啥这样做呢?原因是Rust中的线程需要实现具有'static
生命周期的Send Trait
。task自身的实现同时满足申明周期的要求,所以我们可以将task传递至闭包中,但是我们实现的结构体并不满足条件,所以我们使用了end_time
的一个拷贝。
这种方法意味这在线程启动后,将不能修改超时时间。
调用方式如下:
fn main() {
let mut reactor = Core::new().unwrap();
let wiat = WaitInAnotherThread::new(Duration::seconds(3));
println!("wait future started");
let ret = reactor.run(wiat).unwrap();
println!("wait future completed. ret == {:?}", ret);
}
输出如下:
Finished dev [unoptimized + debuginfo] target(s) in 0.96 secs
Running `target/debug/tst_fut_complete`
wait future started
not ready yet! parking the task.
side thread not running! starting now!
the time has come == 2017-11-21T12:55:23.397862771Z!
ready! the task will complete.
wait future completed. ret == ()
我们来回顾下几个关键的流程:
1 请求reactor启动我们的Future
2 我们的Future发现当前未超时,于是:
2.1 par task
2.2 启动辅助线程
3 辅助线程在一段时间后被唤醒,并且执行:
3.1 通知reactor某一个task能够被unpark
3.2 退出自身线程
4 reactor唤醒被park的线程
5 task完成自身执行,并且:
5.1 告诉reactor当前task执行完成
5.2 返回执行结果
6 reactor将task的返回值返回给run函数的调用者
总结来看,整个过程是很有逻辑条理的。
Conclusion
本文的例子是一个几乎接近于真实情况的Future实现。由于没有引入阻塞过程,所以能够满足reactor的要求,也不消耗额外的资源。这个例子并没有实现具体的任务,尽管Rust已经实现了内建的timeout机制,但是,了解内部的原理对提升我们自身有着很大的帮助。
实际上,通常情况下,大多数程序员并不会手写Future,而是使用包含了相应功能的库,但是了解Future的内部原理仍然很重要。
下一个主题是Streams
,我们将尝试创建不会阻塞reactor的Iterators。
appendix
完整代码如下:
extern crate chrono;
extern crate futures;
extern crate tokio_core;
use chrono::prelude::*;
use chrono::*;
use futures::prelude::*;
use futures::*;
use std::error::Error;
use std::thread::{sleep, spawn};
use tokio_core::reactor::Core;
pub struct WaitInAnotherThread {
end_time: DateTime<Utc>,
running: bool,
}
impl WaitInAnotherThread {
pub fn new(how_long: Duration) -> WaitInAnotherThread {
WaitInAnotherThread {
end_time: Utc::now() + how_long,
running: false,
}
}
fn run(&mut self, task: task::Task) {
let lend = self.end_time;
spawn(move || {
while Utc::now() < lend {
let delta_sec = lend.timestamp() - Utc::now().timestamp();
if delta_sec > 0 {
sleep(::std::time::Duration::from_secs(delta_sec as u64));
}
task.notify();
}
println!("the time has come == {:?}!", lend);
});
}
}
impl Future for WaitInAnotherThread {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if Utc::now() < self.end_time {
println!("not ready yet! parking the task.");
if !self.running {
println!("side thread not running! starting now!");
self.run(task::current());
self.running = true;
}
Ok(Async::NotReady)
} else {
println!("ready! the task will complete.");
Ok(Async::Ready(()))
}
}
}
fn main() {
let mut reactor = Core::new().unwrap();
let wiat = WaitInAnotherThread::new(Duration::seconds(3));
println!("wait future started");
let ret = reactor.run(wiat).unwrap();
println!("wait future completed. ret == {:?}", ret);
}