Rust修行之Future篇-part3

本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 3 - The reactor

介绍

在这一篇文章中,我们将深入reactor内部的实现原理。在前面的文章中,我们多次使用reactor来执行Future,但是我们并不关心内部的实现。

Reactor与Loop

简单的说,reactor就是一个Loop,为了解释这个概念,可以参考一个老套的例子。

你向心仪的女孩发了封邮件,邀请她陪你一起看电影,发完邮件后,你肯定会十分忐忑,会不停的查邮箱,一遍又一遍的查,直到得到回复。

rust的reactor的运行原理类似于这个过程。将Future提交给reactor之后,reactor将不断的检查该Future,直到这个Future运行结束,或者是出现错误。reactor执行Future是通过调用poll函数来完成的,每一个Future都需要实现该函数,具体来说,就是返回一个Poll<T,E>结构体。事实上,reactor并不会无止境的调用poll函数,我们用一个例子来进行分析。

从零开始实现Future

为了认识reactor,我们从零实现一个Future,也就是手动实现Futuretrait。我们实现的Future功能很简单,在超时之后返回。

我们的定义WaitForIt如下:


#[derive(Debug)]

struct WaitForIt {

    message: String,

    until: DateTime<Utc>,

    polls: u64,

}

这个结构体保存有超时时间,一个用户自定义的字符串,以及已经被调用的次数。我们实现的new函数如下:


impl WaitForIt {

    pub fn new(message: String, delay: Duration) -> WaitForIt {

        WaitForIt {

            polls: 0,

            message: message,

            until: Utc::now() + delay,

        }

    }

}

这个new函数将初始化一个WaitForIt实例。

现在,我们开始实现Futuretrait,要做的事情也就是实现poll函数。


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            Ok(Async::NotReady)

        }

    }

}

来看这几行:


type Item = String;

type Error = Box<Error>;

在rust里面,这样的类型被叫做关联类型, 意思就是,Future在将来完成时返回的值(或者错误)。在我们的例子中,WaitForIt最终返回一个String,或者是Box<Error>

看poll的函数的定义:


fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

在这个定义中,Self::ItemSelf::Error是两个关联类型的占位符,函数的定义与下面等价:


fn poll(&mut self) -> Poll<String, Box<Error>>

逻辑部分如下:


let now = Utc::now();

if self.until < now {

  // Tell reactor we are ready!

} else {

  // Tell reactor we are not ready! Come back later!

}

在poll函数中,我们怎么告诉reactor当前Future的执行状态呢,换句话说,reactor怎么直到这个Future已经完成了呢?方法很简单,我们通过Ok枚举携带Async::NotReady表征Future未完成,通过Ok枚举携带Async::Ready表征Future已完成。

poll函数改造如下:


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            Ok(Async::NotReady)

        }

    }

}

我们在main函数中创建reactor来执行Future。


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let ret = reactor.run(wfi_1).unwrap();

    println!("ret == {:?}", ret);

}

我们预期Future在1秒钟之后完成,运行结果如下:


Running `target/debug/tst_fut_create`

wfi_1 == WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 0 }

not ready yet --> WaitForIt { message: "I\'m done:", until: 2017-11-07T16:07:06.382232234Z, polls: 1 }

运行结果貌似不符合预期,仅运行了一次就停住了,但是并没有产生额外的CPU消耗,这是为什么呢?

fcpu

在rust中,一个Future poll函数提交给reactor后,视为这个Future停放在了这个reactor中,而reactor并不会再次调用这个poll函数,除非显式的告知需要被再次调用。在我们的例子中,reactor会立即调用WaitForIt中的poll函数,但是返回值是Async::NotReady,所以这个poll函数将会被停放在这个reactor中。如果没有相应的机制告诉reactor解除停放,那么poll函数将永远不会被再次调用。在这个过程中,reactor处于空闲状态,并不会额外消耗CPU。由于没有去直接查询运行是否完成的状态,所以这种方式的效率很高。在上面的邮件例子中,我们可以让邮箱在收到回复之后通知我们,这样就没有必要不断的去查邮箱了。

另一个更有意义的例子是网络收包过程,在不确定报文什么时候到达的情况下,我们可以阻塞线程等待报文到来,也可以在等待的过程中做其他的事情。

解除停放

有些时候,我们需要解除poll函数的停放,应该怎样修改WaitForIt的实现呢?有很多外部事件可以用来解除停放,比如键盘事件或者网络报文到达,在我们的例子中,我们需要手动触发。


futures::task::current().notify();

Future修改如下:


impl Future for WaitForIt {

    type Item = String;

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        let now = Utc::now();

        if self.until < now {

            Ok(Async::Ready(

                format!("{} after {} polls!", self.message, self.polls),

            ))

        } else {

            self.polls += 1;

            println!("not ready yet --> {:?}", self);

            futures::task::current().notify();

            Ok(Async::NotReady)

        }

    }

}

现在可以可以看到,Future不会停下来。

frun

代码运行结束后,poll函数在1秒内被调用了超过50k次。这是严重的资源浪费,所以,应该仅在事件明确发生的时候,才应该解除停放。

fcpu1

到目前为止,我们的loop是单线程的,如果有需要,可以使用多线程来运行Future。

Joining

reactor有一个有用的特性是可以并行运行多个Future,通过这种方式,我们可以更加高效的利用单线程loop,如果一个Future被停放了,另一个Future将获得执行机会。

我们复用WaitForIt,定义两个Future,然后并发的执行这两个Future:


let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

println!("wfi_1 == {:?}", wfi_1);

let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));

println!("wfi_2 == {:?}", wfi_2);

我们通过futures::future::join_all来并发执行Future,join_all的输入是一个Future迭代器,我们先创建一个vector:


let v = vec![wfi_1, wfi_2];

然后创建联合:


let sel = join_all(v);

完整的代码如下:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(1));

    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = join_all(v);

    let ret = reactor.run(sel).unwrap();

    println!("ret == {:?}", ret);

}

运行结果如下:

frun1

关键点是两个请求是交错的,第一个Future被调用后,第二个Future被调用,然后是第一个,接着是第二个,依此类推,直到两个Future最终完成。

Select

futuretrait有很多辅助性的函数,除了join_all之外,还有一个是selectselect函数运行两个Future,返回第一个完成的Future。这种方法在实现超时时很有用,我们举一个例子:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wfi_1 = WaitForIt::new("I'm done:".to_owned(), Duration::seconds(1));

    println!("wfi_1 == {:?}", wfi_1);

    let wfi_2 = WaitForIt::new("I'm done too:".to_owned(), Duration::seconds(2));

    println!("wfi_2 == {:?}", wfi_2);

    let v = vec![wfi_1, wfi_2];

    let sel = select_all(v);

    let ret = reactor.run(sel).unwrap();

    println!("ret == {:?}", ret);

}

尾声

在下一篇文章中,我们将介绍更加有实际意义的future,不消耗额外的CPU资源,同时更加符合reactor的要求。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352