[Rust-async-book]--6--Executing Multiple Futures at a Time[翻译]

Up until now, we've mostly executed futures by using .await, which blocks the current task until a particular Future completes. However, real asynchronous applications often need to execute several different operations concurrently.
到目前为止,我们大多使用.await来执行futures,它会阻塞当前任务,直到特定的Future完成。然而,真正的异步应用程序通常需要同时执行几个不同的操作。
In this chapter, we'll cover some ways to execute multiple asynchronous operations at the same time:
在本章中,我们将介绍一些同时执行多个异步操作的方法:

  • join!: waits for futures to all complete (等待futures 全部完成)
  • select!: waits for one of several futures to complete (等待几个futures 中的一个完成)
  • Spawning: creates a top-level task which ambiently runs a future to completion(创建一个顶层任务,该任务执行一个future完成)
  • FuturesUnordered: a group of futures which yields the result of each subfuture (一组下级future的结果组合成一个futures )

join!

The futures::join macro makes it possible to wait for multiple different futures to complete while executing them all concurrently.
futures::join 宏允许在同时执行多个不同的futures 时等待它们完成。

When performing multiple asynchronous operations, it's tempting to simply .await them in a series:
当执行多个异步操作时,很容易做到以下几点:

async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}

However, this will be slower than necessary, since it won't start trying to get_music until after get_book has completed. In some other languages, futures are ambiently run to completion, so two operations can be run concurrently by first calling each async fn to start the futures, and then awaiting them both:
然而,这将比必要的慢,因为它不会开始尝试get_music,直到get_book完成之后。在其他一些语言中,futures 操作以中间的方式运行到完成,因此可以同时执行两个操作,首先调用每个async fn来启动futures,然后等待它们:

// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}

However, Rust futures won't do any work until they're actively .awaited. This means that the two code snippets above will both run book_future and music_future in series rather than running them concurrently. To correctly run the two futures concurrently, use futures::join!:
然而,Rust futures 在他们积极.awaited之前是不会做任何工作的。这意味着上面的两个代码片段将在系列中同时运行book_future和music_future,而不是同时运行它们。要同时正确运行两个futures,请使用futures::join!:

use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}

The value returned by join! is a tuple containing the output of each Future passed in.
join返回的值!是一个元组,包含传入的每个Future 的输出。

try_join!

For futures which return Result, consider using try_join! rather than join!. Since join! only completes once all subfutures have completed, it'll continue processing other futures even after one of its subfutures has returned an Err.
对于返回结果的futures ,考虑使用try_join!而不是join!。join!只有在所有子futures都完成后,它才会继续处理其他futures ,即使它的一个子 futures 返回了错误。

Unlike join!, try_join! will complete immediately if one of the subfutures returns an error.
不像join!, try_join! 如果其中一个子futures返回错误,将立即完成。

use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

Note that the futures passed to try_join! must all have the same error type. Consider using the .map_err(|e| ...) and .err_into() functions from futures::future::TryFutureExt to consolidate the error types:
请注意,已通过 try_join! 的 futures 必须都具有相同的错误类型。考虑使用futures::future::TryFutureExt中的.map_err(| e |…)和.err_into()函数来合并错误类型:


use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

select!

The futures::select macro runs multiple futures simultaneously, allowing the user to respond as soon as any future completes.
futures::select 宏同时运行多个futures,允许用户在任何future完成时立即响应。

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

The function above will run both t1 and t2 concurrently. When either t1 or t2 finishes, the corresponding handler will call println!, and the function will end without completing the remaining task.
上面的函数将同时运行t1和t2。当t1或t2完成时,相应的处理程序将调用println!,函数将在未完成剩余任务的情况下结束。
The basic syntax for select is <pattern> = <expression> => <code>,repeated for as many futures as you would like to select over.
select的基本语法是<pattern>=<expression>=><code>,可以重复选择任意多个未来。

default => ... and complete => ...

select also supports default and complete branches.
select还支持默认分支和完整分支。
A default branch will run if none of the futures being selected over are yet complete. A select with a default branch will therefore always return immediately, since default will be run if none of the other futures are ready.
如果选择的futures 尚未完成,则将运行默认分支。因此,具有默认分支的select将始终立即返回,因为如果没有其他futures 准备就绪,则将运行默认值。

complete branches can be used to handle the case where all futures being selected over have completed and will no longer make progress. This is often handy when looping over a select!.
完整的分支可以用来处理所有被选择的 futures 已经完成并且不再取得progress的情况。在select!上循环时,这通常很方便!

use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}

Interaction with 'Unpin' and 'FusedFuture'

One thing you may have noticed in the first example above is that we had to call .fuse() on the futures returned by the two async fns, as well as pinning them with pin_mut. Both of these calls are necessary because the futures used in select must implement both the Unpin trait and the FusedFuture trait.
在上面的第一个例子中,您可能注意到一件事,我们必须对两个异步fn返回的future调用.fuse(),并用pin_mut固定它们。这两个调用都是必需的,因为select中使用的futures必须同时实现Unpin特性和FusedFuture 特性。

Unpin is necessary because the futures used by select are not taken by value, but by mutable reference. By not taking ownership of the future, uncompleted futures can be used again after the call to select.
Unpin 是必要的,因为select使用的futures 不是按值获取的,而是按可变引用获取的。通过不取得future的所有权,未完成的 futures 可以在调用选择后再次使用。

Similarly, the FusedFuture trait is required because select must not poll a future after it has completed. FusedFuture is implemented by futures which track whether or not they have completed. This makes it possible to use select in a loop, only polling the futures which still have yet to complete. This can be seen in the example above, where a_fut or b_fut will have completed the second time through the loop. Because the future returned by future::ready implements FusedFuture, it's able to tell select not to poll it again.
类似地,FusedFuture特性是必需的,因为select在完成后不能 poll a future。FusedFuture由跟踪它们是否完成的futures实现。这使得在一个循环中使用select成为可能,只轮询那些还没有完成的未来。这可以在上面的示例中看到,其中a_fut或b_fut将完成第二次循环。因为future::ready返回的FusedFuture实现了FusedFuture,所以它可以告诉select不要再次轮询它。

Note that streams have a corresponding FusedStream trait. Streams which implement this trait or have been wrapped using .fuse() will yield FusedFuture futures from their .next() / .try_next() combinators.
请注意,streams 具有相应的FusedStream特性。实现该特性或使用.fuse()进行包装的流将从它们的.next() / .try_next()组合子中产生FusedFuture futures 。

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}

Concurrent tasks in a 'select' loop with 'Fuse' and 'FuturesUnordered'

One somewhat hard-to-discover but handy function is Fuse::terminated(), which allows constructing an empty future which is already terminated, and can later be filled in with a future that needs to be run.
一个有点难发现但很方便的函数是Fuse::terminated(),它允许构造一个已经终止的空future ,然后可以填充一个需要运行的future 。

This can be handy when there's a task that needs to be run during a select loop but which is created inside the select loop itself.

当有一个任务需要在select循环中运行,但是它是在select循环本身中创建的时候,这是很方便的。

Note the use of the .select_next_some() function. This can be used with select to only run the branch for Some() values returned from the stream, ignoring Nones.
注意.select_next_some()函数的用法。这可以与select一起使用,以便只对从流返回的Some(
) 值运行分支,而忽略none。

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

When many copies of the same future need to be run simultaneously, use the FuturesUnordered type. The following example is similar to the one above, but will run each copy of run_on_new_num_fut to completion, rather than aborting them when a new one is created. It will also print out a value returned by run_on_new_num_fut.
当需要同时运行同一 future 的多个副本时,请使用FuturesUnordered类型。下面的示例与上面的示例类似,但将在新的num fut上运行run_的每个副本以完成,而不是在创建新副本时中止它们。它还将打印run_on_new_num_fut返回的值。

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,504评论 0 13
  • Chapter 1 In the year 1878, I took my degree of Doctor of...
    foxgti阅读 3,781评论 0 6
  • 好多年没有这么认认真真的看过电视剧了,生怕错过一个镜头( ˃᷄˶˶̫˶˂᷅ )。 看第一集的时候以为李达康是那个通...
    泥鳅Love乌鱼阅读 373评论 1 1
  • 由于码字原因,我需要看很多种类的书增长见闻,自己本身也喜欢看书,所以看起书来荤素不忌,各类都读。有几个朋友见了,就...
    喝牛奶的猫阅读 935评论 1 22
  • 一到营口同学朋友两口子自驾车热情接站直奔晚餐地点~小渔船营口分店。该店给我的第一印象是生意挺好,从两个方面可...
    哈哈镜6567阅读 190评论 0 2