本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 3 - The reactor
介绍
在这一篇文章中,我们将深入reactor内部的实现原理。在前面的文章中,我们多次使用reactor来执行Future,但是我们并不关心内部的实现。
Reactor与Loop
简单的说,reactor就是一个Loop,为了解释这个概念,可以参考一个老套的例子。
你向心仪的女孩发了封邮件,邀请她陪你一起看电影,发完邮件后,你肯定会十分忐忑,会不停的查邮箱,一遍又一遍的查,直到得到回复。
rust的reactor的运行原理类似于这个过程。将Future提交给reactor之后,reactor将不断的检查该Future,直到这个Future运行结束,或者是出现错误。reactor执行Future是通过调用poll
函数来完成的,每一个Future都需要实现该函数,具体来说,就是返回一个Poll<T,E>
结构体。事实上,reactor并不会无止境的调用poll函数,我们用一个例子来进行分析。
从零开始实现Future
为了认识reactor,我们从零实现一个Future,也就是手动实现Future
trait。我们实现的Future功能很简单,在超时之后返回。
我们的定义WaitForIt
如下:
#[derive(Debug)]
struct WaitForIt {
message: String,
until: DateTime<Utc>,
polls: u64,
}
这个结构体保存有超时时间,一个用户自定义的字符串,以及已经被调用的次数。我们实现的new
函数如下:
impl WaitForIt {
pub fn new(message: String, delay: Duration) -> WaitForIt {
WaitForIt {
polls: 0,
message: message,
until: Utc::now() + delay,
}
}
}
这个new
函数将初始化一个WaitForIt
实例。
现在,我们开始实现Future
trait,要做的事情也就是实现poll
函数。
impl Future for WaitForIt {
type Item = String;
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let now = Utc::now();
if self.until < now {
Ok(Async::Ready(
format!("{} after {} polls!", self.message, self.polls),
))
} else {
self.polls += 1;
println!("not ready yet --> {:?}", self);
Ok(Async::NotReady)
}
}
}
来看这几行:
type Item = String;
type Error = Box<Error>;
在rust里面,这样的类型被叫做关联类型, 意思就是,Future在将来完成时返回的值(或者错误)。在我们的例子中,WaitForIt最终返回一个String
,或者是Box<Error>
。
看poll的函数的定义:
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
在这个定义中,Self::Item
与Self::Error
是两个关联类型的占位符,函数的定义与下面等价:
fn poll(&mut self) -> Poll<String, Box<Error>>
逻辑部分如下:
let now = Utc::now();
if self.until < now {
// Tell reactor we are ready!
} else {
// Tell reactor we are not ready! Come back later!
}
在poll函数中,我们怎么告诉reactor当前Future的执行状态呢,换句话说,reactor怎么直到这个Future已经完成了呢?方法很简单,我们通过Ok
枚举携带Async::NotReady
表征Future未完成,通过Ok
枚举携带Async::Ready
表征Future已完成。
poll函数改造如下:
impl Future for WaitForIt {
type Item = String;
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let now = Utc::now();
if self.until < now {
Ok(Async::Ready(
format!("{} after {} polls!", self.message, self.polls),
))
} else {
self.polls += 1;
println!("not ready yet --> {:?}", self);
Ok(Async::NotReady)
}
}
}
我们在main
函数中创建reactor
来执行Future。
fn main() {
let mut reactor = Core::new().unwrap();
let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
println!("wfi_1 == {:?}", wfi_1);
let ret = reactor.run(wfi_1).unwrap();
println!("ret == {:?}", ret);
}
我们预期Future在1秒钟之后完成,运行结果如下:
Running `target/debug/tst_fut_create`
wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }
not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }
运行结果貌似不符合预期,仅运行了一次就停住了,但是并没有产生额外的CPU消耗,这是为什么呢?
在rust中,一个Future poll函数提交给reactor后,视为这个Future停放在了这个reactor中,而reactor并不会再次调用这个poll函数,除非显式的告知需要被再次调用。在我们的例子中,reactor会立即调用WaitForIt中的poll函数,但是返回值是Async::NotReady
,所以这个poll函数将会被停放在这个reactor中。如果没有相应的机制告诉reactor解除停放,那么poll函数将永远不会被再次调用。在这个过程中,reactor处于空闲状态,并不会额外消耗CPU。由于没有去直接查询运行是否完成的状态,所以这种方式的效率很高。在上面的邮件例子中,我们可以让邮箱在收到回复之后通知我们,这样就没有必要不断的去查邮箱了。
另一个更有意义的例子是网络收包过程,在不确定报文什么时候到达的情况下,我们可以阻塞线程等待报文到来,也可以在等待的过程中做其他的事情。
解除停放
有些时候,我们需要解除poll函数的停放,应该怎样修改WaitForIt的实现呢?有很多外部事件可以用来解除停放,比如键盘事件或者网络报文到达,在我们的例子中,我们需要手动触发。
futures::task::current().notify();
Future修改如下:
impl Future for WaitForIt {
type Item = String;
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let now = Utc::now();
if self.until < now {
Ok(Async::Ready(
format!("{} after {} polls!", self.message, self.polls),
))
} else {
self.polls += 1;
println!("not ready yet --> {:?}", self);
futures::task::current().notify();
Ok(Async::NotReady)
}
}
}
现在可以可以看到,Future不会停下来。
代码运行结束后,poll函数在1秒内被调用了超过50k次。这是严重的资源浪费,所以,应该仅在事件明确发生的时候,才应该解除停放。
到目前为止,我们的loop是单线程的,如果有需要,可以使用多线程来运行Future。
Joining
reactor有一个有用的特性是可以并行运行多个Future,通过这种方式,我们可以更加高效的利用单线程loop,如果一个Future被停放了,另一个Future将获得执行机会。
我们复用WaitForIt
,定义两个Future,然后并发的执行这两个Future:
let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
println!("wfi_1 == {:?}", wfi_1);
let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
println!("wfi_2 == {:?}", wfi_2);
我们通过futures::future::join_all来并发执行Future,join_all
的输入是一个Future迭代器,我们先创建一个vector:
let v = vec![wfi_1, wfi_2];
然后创建联合:
let sel = join_all(v);
完整的代码如下:
fn main() {
let mut reactor = Core::new().unwrap();
let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
println!("wfi_1 == {:?}", wfi_1);
let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));
println!("wfi_2 == {:?}", wfi_2);
let v = vec![wfi_1, wfi_2];
let sel = join_all(v);
let ret = reactor.run(sel).unwrap();
println!("ret == {:?}", ret);
}
运行结果如下:
关键点是两个请求是交错的,第一个Future被调用后,第二个Future被调用,然后是第一个,接着是第二个,依此类推,直到两个Future最终完成。
Select
future
trait有很多辅助性的函数,除了join_all
之外,还有一个是select
。select
函数运行两个Future,返回第一个完成的Future。这种方法在实现超时时很有用,我们举一个例子:
fn main() {
let mut reactor = Core::new().unwrap();
let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));
println!("wfi_1 == {:?}", wfi_1);
let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));
println!("wfi_2 == {:?}", wfi_2);
let v = vec![wfi_1, wfi_2];
let sel = select_all(v);
let ret = reactor.run(sel).unwrap();
println!("ret == {:?}", ret);
}
尾声
在下一篇文章中,我们将介绍更加有实际意义的future
,不消耗额外的CPU资源,同时更加符合reactor的要求。