异步

异步方法

平时调用一个方法,那个方法运行完,才会调用接下来的逻辑,这种方式是同步的方式
异步方法是指,调用一个方法后,这个方法可能没有运行完,我让这个方法接着运行,但是我不管了,我去做我自己的事,过一段时间我来拿这个方法的运行结果,或者干脆不需要这个运行结果了。(当然异步方法也可能运行完了,也就是说异步方法调用结束,这个异步方法可能在运行,也可能运行结束了)
方法的实现基于任务并行库,也就是异步方法可能有一段逻辑是在其他线程里面运行的,也可能不涉及线程,实现了并发性

example
现在有个需求,在wpf里面,用户点击按钮以后,后台统计某个网页上有多少个字,统计完显示在按钮上

        private void OnButtonClicked(object sender, RoutedEventArgs e)
        {
            DisplayWebSiteLength();
        }

       async void DisplayWebSiteLength()
        {
            b1.Content = "Fetching...";
            var client = new HttpClient();
            string text = await client.GetStringAsync("https://baidu.com");
            b1.Content = text.Length.ToString();
        }

调用异步方法是正常调用的,在获取网页链接的时候,页面也没有卡顿,按钮上的字也是正常调用的
带UI的应用程序的线程法则
1.不要在UI线程中执行任何长耗时操作
2.不要在UI线程外访问UI线程的控件
(如果在其他线程里通过Dispatch.beginInvoke更新控件,前端还未更新时,相关引用内容发生了变化,那么就可能拿到不想拿到的数据)
client.GetStringAsync("https://baidu.com");返回一个task<string>
在await之前,方法还是同步执行的
然后执行了await,当执行到这里的时候,代码会检查是否已经得到执行结果,如果已经得到就继续同步执行,这里没有拿到结果,方法就立刻返回了,也就是说,如果OnButtonClicked()里面,在调用 DisplayWebSiteLength()之后如果还有别的逻辑,会继续执行,而这个时候DisplayWebSiteLength可能还没有跑完,这个就是异步方法的概念。
而await 之后的逻辑,会在await结果之后继续执行,也就是说在但看这个方法内部,还是按照顺序执行的

大致流程

  1. ui 线程 调用DisplayWebSiteLength()
  2. await 在其他线程里面执行,创建一个延续(延续的内容是await task后面的内容),UI线程返回
    3.UI线程执行OnButtonClicked方法的内容(这里没有内容)
    4.await等到结果了,这个时候通知ui线程,ui线程继续执行DisplayWebSiteLength()方法里面内容(之前创建的延续)
    如果任务没有完成 ,这里延续的实际运行的效果类似如下
var result = await expression;
statement(s);
var awaiter = expression.GetAwaiter();
awaiter.OnColmpeted(()=>{
var result = awaiter.GetResult();
statement(s);
});

返回线程

如果代码运行在富客户端的UI线程上,那么同步上下文会将执行恢复到UI线程上。
否则,执行过程会恢复到任务所在的那个线程上运行(不会有线程切换开销)
如果希望不弹回UI线程上,那么可以用ConfigeAwait()来设置
需要注意的是,如果await同步完成了,那么后面的延续依旧会在当前线程执行

Async

await不能单独存在,所有Await的地方其实都进行了一个异步操作,所以相对应的其调用的地方加上Async关键字
Async并无实际作用,只是告诉编译器,这是个异步方法,也让代码更易读
同时,旧版本的await可能作为变量名标识符来使用,添加Async关键字后,编译器可以支持旧版本的代码:没有加Async的话,就是标识符,加了Async,里面的await就是新的关键字
异步方法的返回值只能是

  • Task类型(返回一个令牌,通知消费者任务的执行状态)
  • Task<TResult>类型(返回一个令牌及其结果)
  • void(一般只会用于事件,调用方不关心事件何时完成,只是告知有个事件要处理一下,然后就不管了)
    Async不能用out或者ref参数,因为这是异步方法,不确定什么时候完成,而这两个参数会和调用方交互。也就是可能出现方法执行完这两个参数还没赋值的情况,这种情况下调用方能访问这两个参数就不合理

await

await后面接可等待模式的,最常见的是任务
会进行拆分,比如等待的如果是Task<TResult>,那么await结束获得的就是TResult的结果,实际await后,方法可能会立刻返回,而后面的语句会作为延续等await的结果拿到之后执行,返回之后主线程就忙自己的事情了。等到await拿到结果再来处理延续
可等待模式:

  • 有个GetAwaiter()方法返回awaiter类型
  • awaiter类型实现了INotifyCompletion接口,该接口中有个OnCompleted(Action)方法
  • awaiter有个bool可读属性IsCompleted
  • awaiter有个GetResult()方法(可以是void)
    不能把await用于不安全的上下文中,也不能用在锁中(锁本身:某个资源可能引发竞争,需要锁起来。await:这个操作本身不知道什么时候完成,这两个逻辑本身就是冲突的,如果写出这样的代码就该考虑重新设计代码了)

等待异步完成(阻塞)

wait一个异步操作,或者访问他的结果,会导致当前线程阻塞等待异步操作完成
检索属性的值 Task.Status 不会在任务完成之前阻止调用线程。
访问exception也不会阻塞,如果当前还没有异常发生则会返回null
如果访问一个async的Result,而异步方法又没完成,那么可能会引起死锁,尤其是在带UI的富文本应用上

同步完成

异步方法也可能会同步完成,如果Await的task立刻返回了,那么接下来的代码就会以同步的方式完成,这个时候编译器不会创建延续,后面的代码也会直接在当前线程里面执行

Console.WirteLine(await GetWebPageAsync("http://baidu.com"));
var awaiter = GetWebPageAsync("http://baidu.com").GetAwaiter();
if(awaiter.IsCompleted)
  Console.WriteLine(awaiter.GetResult());
else
  awaiter.OnCompleted(()=>Console.WriteLine(awaiter.GetResult()))

即使以同步方式返回,但是依旧有一些开销,普通的家用PC在20纳秒左右(2019年),但是回弹到线程池会有1-2微秒的开销,回弹到UI循环可能花费十倍上面的时间

死锁

注意:在异步代码外面执行Task.Wait()或者调用task.Result(),可能会引起死锁。
异步代码返回 ,主线程的逻辑执行task.wait(),等待异步代码的task结束。
异步代码执行await执行完了,通知主线程:我这忙完了,你来处理一下延续
主线程:忙着等异步线程忙完。
异步代码:忙着等主线程来处理。
发生死锁

不使用await async实现并发

  1. 实现粗粒度的并发
int GetPrimesCount(int start,int count)
{
   return  ParallelEnumerable.Range(start, count)
      .Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0));
}
void DisplayPrimeCounts()
{
   for(int i=0;i<10;i++)
  {
    Comsole.WriteLine(GetPrimesCount(i*1000000+2),1000000)+
    " primes between "+(i*1000000) + " and "+ ((i+1)*1000000-1));
  }
}

Task.Run(()=>DisplayPrimeCounts())

上面的直接实现,DisplayPrimeCounts方法整个是一个粗粒度的并发,任务只能一起完成


  1. 将GetPrimesCount改为异步方法
    GetPrimesCount本身是个长耗时的操作,如果直接调用就会阻塞线程,不阻塞线程只能调用Task.Run(),我们希望把Task.Run封装在方法里面,返回一个task,告诉调用方这是个异步方法,因为里面不涉及数据交互,所以也不会有线程安全问题,所以这样是安全的
Task<int> GetPrimeCountAsync(int start,int count)
{
  return Task.Run(()=>
   ParallelEnumerable.Range(start, count)
      .Count(n => Enumerable.Range(2, (int)Math.Sqrt(n) - 1).All(i => n % i > 0)));
}

但是不能在for循环里面直接调用这个异步方法,等task有结果输出,
因为这样的话这些Task实际上会先后开始,一起执行
首先这些task不一定一起结束,所以输出可能有顺序问题
其次,他们内部已经是并发的了,再次并发也不会有性能优势

  for(int i=0;i<10;i++)
  {
     var awaiter = GetPrimesCountAsync(i*1000000+2),1000000).GetAwaoter();
     awaiter.OnCompleted(()=>Console.WriteLine(awaiter.GetResult()))
  }

所以在这种情况下,只能让一次循环结束,再去开启另一次循环

void DisplayPrimeCountsFrom(int i)
{
var awaiter = GetPrimesCountAsync(i*1000000+2,1000000).GetAwaiter();
awaiter.onCompleted(()=>
  {
    Console.WriteLine(awaiter.GetResult());
    if(++i<10)
     DisplayPrimeCountsFrom(i) ;
  })}

  1. 让整个for循环整体成为异步方法
    现在for里面,整个DisplayPrimeCountsFrom会立刻返回,所以如果想将其改造成异步方法,就需要自定义Task结果了,也就是需要用到TaskCompletionSource
Task DisplayPrimeCountsAsync()
{
var machine = new PrimesStateMachine();
machine.DisplayPrimeCountsFrom(0);
return machine. Task;
}
class PrimesStateMachine
{
  TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
  public Task Task { get { return _tcs.Task; }}
  public void DisplayPrimeCountsFrom (int i)
  {
    var awaiter = GetPrimesCountAsync (i*1000000+2, 1000000).GetAwaiter();
    awaiter.0nCompleted (()=>
      Console.Writeline (awaiter.GetResult());
      if (++i <10) DisplayPrimeCountsFrom (i);
      else { Console.Writeline ("Done"); _tcs.SetResult (null);}
    });
  }
}


使用await和async就简单多了

async Task DisplayPrimeCountsAsync()
{
   for(int i=0;i<10;i++)
  {
    Comsole.WriteLine( awaiter GetPrimesCount(i*1000000+2),1000000)+
    " primes between "+(i*1000000) + " and "+ ((i+1)*1000000-1));
  }
}

异常

如果任务出错,那么await到结果的时候会抛出原来的异常,但是如果调用result属性会抛出一个组合异常,里面的异常才是任务里面的异常
async异步方法里面可以正常捕获异常并处理,正常将await的结果包围即可
如果没有await,没有访问task的结果,那么即使包在try catch里面,调用方也捕获不到异常
如果async void的,那么也是捕获不到异常的,因为没有返回一个task
有些框架是有集中式异常处理事件的,比如WPF里面的Application.DispatcherUnhandledException,大致逻辑实现是在TryCatch语句块里面触发UI事件,还支持捕获返回键是Void的异步函数抛出的异常。(如果返回值是task,异常会导致task状态失败,编译器不会去捕获这个异常,也就是产生了未观测的异常)
只有async方法才会有编译器帮忙的一系列操作,比如,将返回值和异常封装进Task里面,如果是普通的方法,只返回一个Task,那么实际上如果方法内部同步的抛出异常,外面就能立刻接收到,而不是像async一样,得到一个失败的task,不去await或者不访问结果就拿不到数据

应用范围

可以await一个非泛型的任务
await Task.Delay(5000);
await后面需要跟一个实现了GetAwaiter()方法的对象,且该方法返回值需要

  1. 实现INotifyCompletion.OnCompleted方法
  2. 返回恰当类型的GetResult()方法
  3. 有个bool类型的IsCompleted属性
    await不能在锁里面出现,实际上也不应当在锁里面出现,因为await表示我不知道什么时候结束,而锁是锁定某个资源等待其结束不让别人用,如果这两个同时存在本身就有逻辑上的冲突
    不能在unsafe上下文中

伪并发

在富文本的环境下,使用Await时,默认情况下,await的任务会并发处理,但是await的结果和延续会处理在UI线程,如果有多个任务,那么就是任务结束时回到UI线程,然后又重开线程去做任务,而没有占用UI线程的时候,UI线程会做自己的事。
只有在必要的时候租用UI的时间,只有在await的过程中才会发生抢占,简化了线程安全性。真正的并发发生在调用栈的底层,真正并发的代码避免访问共享变量或者UI的控件
工作线程传递消息给UI,哪怕用了Dispatch避免阻塞,也会有变量共享的风险,因为同时在UI线程和工作线程访问了这个变量,如果这个变量发生了变化,而UI又还未处理这个消息,就会导致显示错误。

返回值

异步方法如果本身不需要返回值,只返回一个Task表明执行的状态的话,那么直接把void变成Task就行,方法里也不需要显式返回。如果需要返回结果,则需要Task泛型包装一下,比如Task<int>,返回一个int值即可

并发

直接调用异步方法,不等待就实现了并发
考虑一下两个版本代码和效果的区别

var task1 = ....
var task2= ....
await task1;
await task2;

task1和task2将会并发执行

var task1 = ....
await task1;
var task2= ....
await task2;

task1和task2将分开执行,task2在task1结束之后才开始
这里面的重点在于,任务在创建的时候就开始了,await只是在等待任务结束
由于方法在await以外还是在当前线程执行了的(如果await里面再次await了也一样,里面的异步方法也是一样)所以是线程安全的,可以放心的共享变量

异步lambda表达式(异步匿名方法)

只要在lambda表达式前面加上async关键字即可

Func<Task<int>> unnamed = async ()=>{
  await Task.Delay(1000);
  Console.WriteLine("Foo");
  return 123
}
var answer =await unnamed();

注意这个返回值,原先的结果返回的是Int,也就是返回的是Func<int>
但是异步匿名方法返回的是Task<int>,也就是这个结果还没出来

 var a = Enumerable.Range(0, 1000).Sum(i => 2*i );
 Enumerable<Task<int>> a2= Enumerable.Range(0, 1000).Select(async i=> 2*i);

上面的处理只是举个例子,因为还是同步完成的,都没有await,实际调用时可能求一个数的阶乘或者最大素数,花费时间可能较长,这个时候需要用Task包围,就需要await了
不能直接调用Sum了,因为结果还没出来,同样的也不能调用where了
上面这个是最常见的用法,使用select语句将结果处理或者转换

myButon.Click += async(sender,args)=>{
  await Task.Delay(1000);
  Console.WriteLine("Foo");
}

需要注意的是,查询运算符的延迟执行
其他的,异步匿名方法其实也是异步方法和lambda的组合,也可以捕获变量,添加参数,异步方法的限制也有,运算符的延迟执行(如select)也有,也就是说单单select的话,task不会立刻执行

异步流

异步流基于 IAsyncEnumerable<out T>接口

用yield return 和await可以创建一个异步流,其返回值为IAsyncEnumerable<T>接口

async IAsyncEnumerable<int> RangeAsync(int start,int count,int delay)
{
  for (int i =start;i <start +count;i++)
  {
    await Task.Delay(delay);
    yield return i;
  }
}
await foreach(var number in RangeAsync(0,10,500))
  Console.WriteLine(number);

优化及建议

缓存Task
考虑以下需求:并发获取页面长度,调用GetWebAsync,为了避免重复访问相同网址产生的额外开销 ,需要将结果记录下来,如果用一个Dictionary储存网址和结果,可以避免部分重复访问。但是如果同一时间有多个相同网址的并发请求,由于还没有结果,所以每个请求都会实际去调用,还是会有额外的开销
更加巧妙的办法是存储Dictionary<string,Task<string>>,这样如果表里面有,直接访问task地result即可

static Dictionary<string,Task<string>> _cache = new Dictionary<string,Task<string>>();
Task<string> GetWebAsync(string uri)
{
if(_cache.TryGetValue(uri,out var downloadTask))
   return downloadTask;
return _cache[uri] = new WebClient().DownloadStringTaskAsync(uri);
}

如果任务已经有,那么会同步完成,所以效率很高
以上代码如果只在一个线程中调用或者在同步上下文中调用,那么不会有问题
如果是多线程可能会都调用,那么还需要考虑在Dictionary那里加锁

lock(_cache)
{
    if(_cache.TryGetValue(uri,out var downloadTask))
       return downloadTask;
    return _cache[uri] = new WebClient().DownloadStringTaskAsync(uri);
}

由于锁本身不会等待任务执行完毕,只是在检查缓存,开始新任务,更新缓存的一小段时间加了锁,所以也不会有线程安全问题。
ValueTask
上面的代码里面,已经很棒了,但是每次并发都会创建新的Task,因为是引用类型,创建实例还是需要在堆上分配内存并进行后续回收。所以还是会有性能压力,上面的例子无法优化
如果大部分情况下,DownloadStringTaskAsync能够同步完成,那么这种情况用ValueTask更加合适,ValueTask在能同步完成的情况下不会创建对象,而是创建 值对象,只有无法 同步完成的时候才会创建普通的Task,这种情况下性能不会有改善,甚至可能下降
ValueTask使用的时候需要小心(因为为了性能做成了值类型,有些操作有风险)

  • 不能多次await同一个valuetask对象
  • 在操作没有结束前不能调用GetAwaiter().GetResult()方法
  • 如果一定要进行操作的话可以先as task
  • 最简单的操作是只进行await操作
    减少上下文切换开销
    使用ConfigureAwait来避免消息重复回弹到UI消息循环中,会阻止延续提交到同步上下文中,将开销降低到上下文切换的基本开销
    (尤其适用于编写程序库,因为程序库不会有共享变量的需求,所以不需要简化线程安全问题,也不需要访问UI控件)
    需要注意的是,如果await的异步方法同步完成了,那么延续依然会在当前同步上下文执行,如果延续里面还有await的话,也要加上ConfigureAwait,不然还是会回弹到同步上下文,所以最好都加上
    考虑以下代码(A在UI线程上调用)
async void A()
{
    ... //在UI线程上执行 (假设是UI线程的调用)
    await B();  // 等待B(),B()结束后继续在UI线程上执行
    ... //在UI线程上执行
}

async void B()
{
    ... //在UI线程上执行
    for(int i = 0; i < 1000; ++i)
        await C();  // 等待C(),每次循环结束后继续在UI线程上执行
    ... //在UI线程上执行
}

async void C()
{
    ... //在UI线程上执行
    await ...;  // 等待异步操作,异步操作结束后继续在UI线程上执行
    ... //在UI线程上执行
}

上面的逻辑在UI线程上执行,是因为在await之前和之后,代码还是以同步形式运行的,只有await里面的逻辑会在其他线程完成,也就是前面提到的伪并发

 async void A()
        {
            // 在UI线程上执行
            Console.WriteLine($"A start: {Thread.CurrentThread.ManagedThreadId}");
            await B();
            // 在UI线程上执行
            Console.WriteLine($"A end: {Thread.CurrentThread.ManagedThreadId}");
        }

        async Task B()
        {
            // 在UI线程上执行
            Console.WriteLine($"B start: {Thread.CurrentThread.ManagedThreadId}");
            for (int i = 0; i < 3; ++i)
            {
                await C().ConfigureAwait(false);
            }
            // 在线程池线程上执行(这里也属于C的延续)
            Console.WriteLine($"B end: {Thread.CurrentThread.ManagedThreadId}");
        }

        async Task C()
        {
            // 在UI线程上执行(第一遍)
            //在线程池上执行(第二遍属于第一遍的延续)
            Console.WriteLine($"C start: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(1); // 模拟异步操作

            // 在UI线程上执行(第一遍)
            //在线程池上执行(第二遍属于第一遍的延续,不要求回弹到UI线程了)
            Console.WriteLine($"C end: {Thread.CurrentThread.ManagedThreadId}");
        }

不要混用同步异步
不要将同步方法包装成异步方法,也不要将异步方法包装成同步方法,一个是使用不规范,另一种是调用方的误解可能会导致异常

取消操作

主要通过CancellationToken类来实现。这个机制允许你在执行任务的过程中请求取消任务,并让任务有机会响应取消请求。
在异步方法设计时,就需要考虑其可能的取消操作,尤其是调用了底层的其他异步方法时,完全可以将令牌传递下去。取消操作应该处理好资源的回收

获取token

首先,需要一个CancellationTokenSource来生成CancellationToken。

CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;

使用token

然后,将token传递给任务

async Task LongRunningOperationAsync(CancellationToken token)
{
    for (int i = 0; i < 10; i++)
    {
        // 检查取消请求
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Task is running. Iteration: {i}");

        try
        {
            // 模拟异步操作
            await Task.Delay(1000, token);
        }
        catch (TaskCanceledException)
        {
            // 捕获异步操作的取消异常
            Console.WriteLine("Delay was cancelled.");
            throw; // 重新抛出以便外部捕获
        }
    }
}

这里如果任务取消了,会抛出一个TaskCanceledException,任务的状态会变成canceled(是抛出该异常或者该异常的子类,就会变成canceled,也就是说任务也可以自己通过抛出异常的方式Cancel而不需要特定的令牌)
也支持访问IsCancellationRequested属性来判断任务有没有取消,再执行具体的逻辑

async Task LongRunningOperationAsync(CancellationToken token)
    {
        for (int i = 0; i < 10; i++)
        {
            // 手动检查取消请求
            if (token.IsCancellationRequested)
            {
                Console.WriteLine("Cancellation requested, exiting task.");
                // 可在此进行任何清理操作
                break;
            }

            Console.WriteLine($"Task is running. Iteration: {i}");

            try
            {
                // 模拟异步操作
                await Task.Delay(1000);
            }
            catch (TaskCanceledException)
            {
                // 捕获异步操作的取消异常
                Console.WriteLine("Delay was cancelled.");
                throw; // 重新抛出以便外部捕获
            }
        }

        // 在任务退出前,通知取消
        if (token.IsCancellationRequested)
        {
            // 如果需要,任务结束时抛出OperationCanceledException以便外部捕获
            token.ThrowIfCancellationRequested();
        }
    }
}

执行取消

直接调用 CancellationTokenSource对象上的Cancel()方法即可,这个是在异步方法外调用的,这边也可以访问IsCancellationRequested属性来判断是否已经发起取消请求了
也可以在创建CancellationTokenSource就传入一个时间间隔,让其在一定时间后取消

CancellationTokenSource cts = new CancellationTokenSource();

也可以传入一个token,这个token取消的时候我也取消

CancellationTokenSource cts1 = new CancellationTokenSource();
CancellationTokenSource cts2 = new CancellationTokenSource();

CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
CancellationToken linkedToken = linkedCts.Token;

如果cts1或cts2任意一个被取消,linkedCts也会被取消,从而取消关联的任务。cts1和2可以分别是用户手动取消和操作超时

注册取消回调方法

CancellationToken提供了一个Register方法,允许在取消请求发出时执行特定的回调操作。(可以进行一些清理操作或记录日志)

CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;

// 注册取消回调
token.Register(() =>
{
    Console.WriteLine("Cancellation has been requested.");
});

分离CancellationTokenSource和CancellationToken

  1. 单一职责原则(SRP):
    CancellationTokenSource:负责发起和管理取消操作。它提供方法来请求取消(例如Cancel方法),以及检查取消状态(例如IsCancellationRequested属性)。
    CancellationToken:负责传递取消请求的通知。它主要用于消费者,提供取消通知的能力,而不允许发起取消操作。消费者不应该能自己取消
  2. 线程安全
    通过将取消的发起者和接收者分开,可以更好地控制权限。只有拥有CancellationTokenSource的代码可以发起取消操作,而其他代码只能通过CancellationToken来响应取消。(不存在我们两个消费者都用这个token,然后你把这个token取消了,导致我也被迫取消的情况)
    这可以防止错误地在不该取消的地方调用Cancel方法,提高了系统的健壮性。
  3. 灵活性
    一个CancellationTokenSource生成的CancellationToken可以被多个地方同时使用,允许同一个取消请求被多个任务或操作共享。这在一些复杂场景中非常有用,例如多个任务需要同步取消时。
    这种设计还支持嵌套的取消令牌。例如,一个操作可以接收一个父级的CancellationToken,同时创建自己的CancellationTokenSource以便控制自己的取消逻辑。
    也可以直接把这个令牌传递给自己的子任务,这样一旦取消子任务也会收到请取消请求(会沿着调用栈向下传递)

进度报告

使用IProgress<T>接口和Progress类来安全的报告进度,会确保报告的委托在同步上下文中报告,如果没有同步上下文,那么会在线程池中报告(在同步上下文中报告意味着你可以直接使用UI控件)

var pro = new Progress<int> (i=>Console.WriteLine(i+" %"));
Await Foo(pro)
Task Foo(IProgress<int> progress)
{
  return Task.Run(()=
  {
    for(int i=0;i<1000;i++)
    {
      if(i%10=0)
        progress.Report(i/10);
       //some logic
    }
   });
}

TAP

.Net Core提供了很多返回任务的异步方法,其中打本份都采用了基于任务的异步模式(Task-based Asynchronous Pattern,TAP)

  • 有个Async后缀
  • 返回一个热Task
  • 有些支持取消或进度报告
  • 快速返回调用者
  • 对于I/O密集型任务不绑定线程

任务组合器

Task.WhenAny
任务组中任何一个任务结束,则返回任务的结果是该任务,也就是会返回Task<Task>,其他任务会继续执行,注意返回的是任务,也就是说如果想直接拿到结果,那么需要await两次

 static async Task Main(string[] args)
    {
        // 创建三个任务,模拟从不同服务器获取数据
        var task1 = GetDataFromServerAsync("Server 1", 3000);
        var task2 = GetDataFromServerAsync("Server 2", 2000);
        var task3 = GetDataFromServerAsync("Server 3", 1000);

        // 使用 Task.WhenAny 等待任意一个任务完成
        var completedTask = await Task.WhenAny(task1, task2, task3);

        // 根据完成的任务执行相应操作
        if (completedTask == task1)
        {
            Console.WriteLine($"Task from {await task1} completed first.");
        }
        else if (completedTask == task2)
        {
            Console.WriteLine($"Task from {await task2} completed first.");
        }
        else if (completedTask == task3)
        {
            Console.WriteLine($"Task from {task3.Result} completed first.");
        }
    }

    // 模拟一个异步方法,从服务器获取数据
    static async Task<string> GetDataFromServerAsync(string serverName, int delay)
    {
        await Task.Delay(delay); // 模拟异步操作的延迟
        return serverName;
    }

用处

  • 在不支持超时或取消的操作中添加超时或取消功能(原本的实际还在运行,新建一个超时停止的任务或者支持取消的任务,然后调用WhenAny将这两个任务组合)
  • 最快响应 从多个源或者服务中获取最快得到的数据
    如果任务的返回值类型不一致,那么只会得到一个Task,而不是Task<T>,也就是无法获取结果,WhenAll也一样

Task.WhenAll()
返回一个任务,该任务只有当所有任务全部完成或出错才完成

await Task.WhenAll(t1,t2,t3)

效果上等同于(如果任务顺利结束,但是前者的效率会稍微高一些)

var tt1 = t1();var tt2 = t2();var tt3=t3();
await tt1; await tt2; await tt3;

但是一旦发生异常,比如t1失败,后者由于await tt1就会抛出异常,导致后面await tt2 和tt3不执行,这样的话如果tt2或tt3产生了异常就没有观测到
而WhenAll会等待所有任务完成,即使出错。如果多个任务发生异常,那么访问其result抛出的组合异常里面会有多个异常,但是如果直接await,这个时候只能得到第一个异常
所以想要得到所有异常

var all = Task.WhenAll(t1,t2,t3);
try{await all;}
catch{
Console.WriteLine(all.Exception?.InnerExceptions?.Count)
}

对于一系列任务Task<TResult>调用WhenAll,会返回一个Task<TResult[]>,不同于WhenAny,这个不需要await拆两次

async Task<int> GetTotalSize(string[] uris)
{
  var downloadTasks = uris.Select(uri=> new WebClient().DownloadDataTaskAsync(uri));
  var results = await Task.WhenAll(downloadTasks);
  return results.Sum(s=>s.Length);
}

上面的所有任务结束以后,才会统计长度将结果加起来
可以使用异步匿名函数改进

     async Task<int> GetTotalSize(string[] uris)
        {
            IEnumerable<Task<int>> result = uris.Select(async uri => (await new WebClient().DownloadDataTaskAsync(uri)).Length);
            var results = await Task.WhenAll(result);
            return results.Sum();
        }

自定义组合器

可以根据自己需要自定义组合任务

  • 定时任务
static async Task<TResult> WithTimeout<TResult>(this Task<TResult> task, TimeSpan timeout)
{
    var cancelSource = new CancellationTokenSource();
    var delay = Task.Delay(timeout, cancelSource.Token);
    Task winner = await Task.WhenAny(task, delay).ConfigureAwait(false);
    if (winner == task)
    {
        cancelSource.Cancel();
    }
    else
    {
        throw new TimeoutException();
    }
    return await task.ConfigureAwait(false);  // Unwrap result/re-throw
}
  • 为原来不支持取消的添加支持(只是使用方感觉支持了)
static Task<TResult> WithCancellation<TResult>(this Task<TResult> task, CancellationToken cancelToken)
{
    var tcs = new TaskCompletionSource<TResult>();
    var reg = cancelToken.Register(() => tcs.TrySetCanceled());
    task.ContinueWith(ant =>
    {
        reg.Dispose();
        if (ant.IsCanceled)
            tcs.TrySetCanceled();
        else if (ant.IsFaulted)
            tcs.TrySetException(ant.Exception.InnerException);
        else
            tcs.TrySetResult(ant.Result);
    });
    return tcs.Task;
}
  • WhenAll修改版,有一个任务出错最终任务也立刻会出错
async Task<TResult[]> WhenAllOrError<TResult>(params Task<TResult> [] tasks)
{
var killJoy = new TaskCompletionSource<TResult[]>();
foreach(var task in tasks)
{
  task.ContinueWith(ant=>{
  if(ant.IsCanceled)
    killJoy.TrySetCanceled();
  else if(ant.IsFaulted)
    killJoy.TrySetException(ant.Exception.InnerException);
});
return await await Task.WhenAny(killJoy.Task,Task.WhenAll(tasks)).ConfigureAwait(false);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容