Rust修行之Future篇-part4

本文翻译自Rust futures: an uneducated, short and hopefully not boring tutorial - Part 4 - A "real" future from scratch

介绍

在前面的文章中,我们涵盖了Future的基本操作,包括集联,执行,以及创建。但是到目前为止,我们对Future的了解并不能帮助我们解决一些实际的问题。在part3中,我们使用简单粗暴的方法实现了Future的parking以及即时unparking,这个小技巧可以帮助我们利用Future处理一些流程处理的事情,但是仍然离真实的场景相距甚远。在本文中,我们将实现一个更加贴合实际的例子。

A timer future

在part3中,我们创建了一个最简单的Timer Future,我们依然从这个例子着手,但是这一次我们不会立即unparking这个Future的task,而是持续保持Parked状态,直到其达到了完成状态。实现这个功能最简单的做法是引入另一个线程,这个线程将持续等待,直到一段时间之后unpark我们之前设置的任务。

这种机制类似于模拟了一个异步IO的过程,在异步事情完成时,会收到相应的通知。为了简单起见,我们仅创建一个单线程的reactor,在等待期间我们可以执行其他的事情。

Timer revised

我们定义一个简单的结构体,仅包含超时时间以及当前的运行状态。


pub struct WaitInAnotherThread {

    end_time: DateTime<Utc>,

    running: bool,

}

impl WaitInAnotherThread {

    pub fn new(how_long: Duration) -> WaitInAnotherThread {

        WaitInAnotherThread {

            end_time: Utc::now() + how_long,

            running: false,

        }

    }

}

在上面的定义中,DateTime与Duration这两个类型定义来自于chronos crate。

Spin wait

我们实现的等待函数如下:


pub fn wait_spin(&self) {

    while Utc::now() < self.end_time {}

    println!("the time has come == {:?}!", self.end_time);

}

这个函数仅仅简单的比较当前时间与超时时间,这种方法有效,并且检测方式非常精确,缺点是过度消耗cpu资源。


fn main() {

    let wiat = WaitInAnotherThread::new(Duration::seconds(30));

    println!("wait spin started");

    wiat.wait_spin();

    println!("wait spin completed");

}

fcpu

从图中可以看出core 8的利用率是100%,这与part3中的例子是一致的。

spin-wait这种方式非常精确但是极度浪费,应该仅用在等待时间非常短,或者是没有选择的情况下使用。

Sleep wait

通常OS提供sleep函数来park用户线程,睡眠X秒的意思实际上是告诉cpu在这段时间内,不要调度当前的线程。在sleep期间,如果cpu还有足够的资源,可以执行其他的任务。在Rust中,通过std::thread::sleep()来实现线程睡眠。

我们基于sleep改造前面的等待函数:


pub fn wait_blocking(&self) {

    while Utc::now() < self.end_time {

        let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();

        if delta_sec > 0 {

            thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

        }

    }

    println!("the time has come == {:?}!", self.end_time);

}

在上面的函数中,我们通过减去已经睡眠的时间来得到尚需继续睡眠的时间,由于timestamp函数精度不高,整体准确度将低于spin-wait方法。调用方法如下:


let wiat = WaitInAnotherThread::new(Duration::seconds(30));

println!("wait blocking started");

wiat.wait_blocking();

println!("wait blocking completed");

修改之后的性能取得了极大的改善,cpu消耗如下:


fcpu1

改善了性能,但是我们怎样在Future中使用呢。

Future

我们先实现一个简单的Future:


impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        while Utc::now() < self.end_time {

            let delta_sec = self.end_time.timestamp() - Utc::now().timestamp();

            if delta_sec > 0 {

                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

            }

        }

        println!("the time has come == {:?}!", self.end_time);

        Ok(Async::Ready(())

}

在poll函数中,通过sleep来睡眠等待,并不消耗cpu,但是这种方法将会阻塞reactor,将会影响到其他的Future。这种写法是不提倡的。

在Rust中,Future应尽量避免使用阻塞函数。

一个好的reactor使用习惯应该遵循以下原则:

1 当需要等待超时事件时,应该停止当前任务

2 不要阻塞当前线程

3 异步事件完成时通知reactor。

我们将在另一个线程中引入sleep函数来满足以上原则。这个睡眠函数不消耗资源,由于运行在看另一个线程中,所以不受当前线程的影响。当sleep线程完成时,将会唤醒当前任务,同时通知reactor。

我们一步步来实现上述想法。


impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        if Utc::now() < self.end_time {

            println!("not ready yet! parking the task.");

            if !self.running {

                println!("side thread not running! starting now!");

                self.run(task::current());

                self.running = true;

            }

            Ok(Async::NotReady)

        } else {

            println!("ready! the task will complete.");

            Ok(Async::Ready(()))

        }

    }

}

我们仅需要启动一次睡眠线程,所以使用一个running变量来标记。有一点需要切记,在Future的poll函数被调用之前,我们的任务并不会被执行,这一点与我们的需求是一致的。此外,在启动睡眠线程之前,我们还检查了超时时间是否已过,如果已经超时,将不会创建睡眠线程。

如果未超时,且睡眠线程创建完成后,我们向reactor申请park我们的任务,通过返回Ok(Async::NotReady)完成。与part3相反,在这里我们并不会立即unpark,这个工作将由睡眠线程来完成。在其他的实现中,比如IO,这样的park行为可能由OS来完成。

睡眠线程的实现如下:


fn run(&mut self, task: task::Task) {

    let lend = self.end_time;

    thread::spawn(move || {

        while Utc::now() < lend {

            let delta_sec = lend.timestamp() - Utc::now().timestamp();

            if delta_sec > 0 {

                thread::sleep(::std::time::Duration::from_secs(delta_sec as u64));

            }

            task.notify();

        }

        println!("the time has come == {:?}!", lend);

    });

}

有两点值得注意。首先,我们将task传递给了睡眠线程,原因是我们我们不能在另外的线程中访问到Task::current()。其次,我们不能将self移动到闭包中,所以我们需要转移所有权至lend变量。为啥这样做呢?原因是Rust中的线程需要实现具有'static生命周期的Send Trait。task自身的实现同时满足申明周期的要求,所以我们可以将task传递至闭包中,但是我们实现的结构体并不满足条件,所以我们使用了end_time的一个拷贝。

这种方法意味这在线程启动后,将不能修改超时时间。

调用方式如下:


fn main() {

    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));

    println!("wait future started");

    let ret = reactor.run(wiat).unwrap();

    println!("wait future completed. ret == {:?}", ret);

}

输出如下:


Finished dev [unoptimized + debuginfo] target(s) in 0.96 secs

    Running `target/debug/tst_fut_complete`

wait future started

not ready yet! parking the task.

side thread not running! starting now!

the time has come == 2017-11-21T12:55:23.397862771Z!

ready! the task will complete.

wait future completed. ret == ()

我们来回顾下几个关键的流程:

1 请求reactor启动我们的Future

2 我们的Future发现当前未超时,于是:

2.1 par task

2.2 启动辅助线程

3 辅助线程在一段时间后被唤醒,并且执行:

3.1 通知reactor某一个task能够被unpark

3.2 退出自身线程

4 reactor唤醒被park的线程

5 task完成自身执行,并且:

5.1 告诉reactor当前task执行完成

5.2 返回执行结果

6 reactor将task的返回值返回给run函数的调用者

总结来看,整个过程是很有逻辑条理的。

Conclusion

本文的例子是一个几乎接近于真实情况的Future实现。由于没有引入阻塞过程,所以能够满足reactor的要求,也不消耗额外的资源。这个例子并没有实现具体的任务,尽管Rust已经实现了内建的timeout机制,但是,了解内部的原理对提升我们自身有着很大的帮助。

实际上,通常情况下,大多数程序员并不会手写Future,而是使用包含了相应功能的库,但是了解Future的内部原理仍然很重要。

下一个主题是Streams,我们将尝试创建不会阻塞reactor的Iterators。

appendix

完整代码如下:


extern crate chrono;

extern crate futures;

extern crate tokio_core;

use chrono::prelude::*;

use chrono::*;

use futures::prelude::*;

use futures::*;

use std::error::Error;

use std::thread::{sleep, spawn};

use tokio_core::reactor::Core;

pub struct WaitInAnotherThread {

    end_time: DateTime<Utc>,

    running: bool,

}

impl WaitInAnotherThread {

    pub fn new(how_long: Duration) -> WaitInAnotherThread {

        WaitInAnotherThread {

            end_time: Utc::now() + how_long,

            running: false,

        }

    }

    fn run(&mut self, task: task::Task) {

      let lend = self.end_time;

        spawn(move || {

            while Utc::now() < lend {

                let delta_sec = lend.timestamp() - Utc::now().timestamp();

                if delta_sec > 0 {

                    sleep(::std::time::Duration::from_secs(delta_sec as u64));

                }

                task.notify();

            }

            println!("the time has come == {:?}!", lend);

        });

    }

}

impl Future for WaitInAnotherThread {

    type Item = ();

    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

        if Utc::now() < self.end_time {

            println!("not ready yet! parking the task.");

            if !self.running {

                println!("side thread not running! starting now!");

                self.run(task::current());

                self.running = true;

            }

            Ok(Async::NotReady)

        } else {

            println!("ready! the task will complete.");

            Ok(Async::Ready(()))

        }

    }

}

fn main() {

    let mut reactor = Core::new().unwrap();

    let wiat = WaitInAnotherThread::new(Duration::seconds(3));

    println!("wait future started");

    let ret = reactor.run(wiat).unwrap();

    println!("wait future completed. ret == {:?}", ret);

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351