【读书笔记】.Net并行编程高级教程--Parallel

一直觉得自己对并发了解不够深入,特别是看了《代码整洁之道》觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准。而且在《失控》这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物。人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情。所以好奇心驱使下学习并发。便有了此文。

一、理解硬件线程和软件线程

多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够同时并行运行。硬件线程也称为逻辑内核,一个物理内核可以使用超线程技术提供多个硬件线程。所以一个硬件线程并不代表一个物理内核;Windows中每个运行的程序都是一个进程,每一个进程都会创建并运行一个或多个线程,这些线程称为软件线程。硬件线程就像是一条泳道,而软件线程就是在其中游泳的人。

二、并行场合

.Net Framework4 引入了新的Task Parallel Library(任务并行库,TPL),它支持数据并行、任务并行和流水线。让开发人员应付不同的并行场合。

数据并行:有大量数据需要处理,并且必须对每一份数据执行同样的操作。比如通过256bit的密钥对100个Unicode字符串进行AES算法加密。

任务并行:通过任务并发运行不同的操作。例如生成文件散列码,加密字符串,创建缩略图。

流水线:这是任务并行和数据并行的结合体。

TPL引入了System.Threading.Tasks ,主类是Task,这个类表示一个异步的并发的操作,然而我们不一定要使用Task类的实例,可以使用Parallel静态类。它提供了Parallel.Invoke, Parallel.For Parallel.Forecah 三个方法。

三、Parallel.Invoke

试图让很多方法并行运行的最简单的方法就是使用Parallel类的Invoke方法。例如有四个方法:

WatchMovie

HaveDinner

ReadBook

WriteBlog

通过下面的代码就可以使用并行。

System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);

这段代码会创建指向每一个方法的委托。Invoke方法接受一个Action的参数组。

public static void Invoke(params Action[] actions);

用lambda表达式或匿名委托可以达到同样的效果。

System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () =>ReadBook(),delegate() { WriteBlog(); });

1.没有特定的执行顺序。

Parallel.Invoke方法只有在4个方法全部完成之后才会返回。它至少需要4个硬件线程才足以让这4个方法并发运行。但并不保证这4个方法能够同时启动运行,如果一个或者多个内核处于繁忙状态,那么底层的调度逻辑可能会延迟某些方法的初始化执行。

给方法加上延时,就可以看到必须等待最长的方法执行完成才回到主方法。

staticvoidMain(string[] args)

{

System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook,

WriteBlog);

Console.WriteLine("执行完成");

Console.ReadKey();

}staticvoidWatchMovie()

{

Thread.Sleep(5000);

Console.WriteLine("看电影");

}staticvoidHaveDinner()

{

Thread.Sleep(1000);

Console.WriteLine("吃晚饭");

}staticvoidReadBook()

{

Thread.Sleep(2000);

Console.WriteLine("读书");

}staticvoidWriteBlog()

{

Thread.Sleep(3000);

Console.WriteLine("写博客");

}

View Code

这样会造成很多逻辑内核处于长时间闲置状态。

四、Parallel.For

Parallel.For为固定数目的独立For循环迭代提供了负载均衡 (即将工作分发到不同的任务中执行,这样所有的任务在大部分时间都可以保持繁忙) 的并行执行。从而能尽可能地充分利用所有的可用的内核。

我们比较下下面两个方法,一个使用For循环,一个使用Parallel.For  都是生成密钥在转换为十六进制字符串。

privatestaticvoidGenerateAESKeys()

{varsw =Stopwatch.StartNew();for(inti =0; i < NUM_AES_KEYS; i++)

{varaesM =newAesManaged();

aesM.GenerateKey();byte[] result =aesM.Key;stringhexStr =ConverToHexString(result);

}

Console.WriteLine("AES:"+sw.Elapsed.ToString());

}privatestaticvoidParallelGenerateAESKeys()        {varsw =Stopwatch.StartNew();            System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS +1, (inti) =>{varaesM =newAesManaged();

aesM.GenerateKey();byte[] result =aesM.Key;stringhexStr =ConverToHexString(result);

});

Console.WriteLine("Parallel_AES:"+sw.Elapsed.ToString());

}

privatestaticintNUM_AES_KEYS =100000;staticvoidMain(string[] args)

{

Console.WriteLine("执行"+NUM_AES_KEYS+"次:");GenerateAESKeys();

ParallelGenerateAESKeys();Console.ReadKey();        }

执行1000000次

这里并行的时间是串行的一半。

五、Parallel.ForEach

在Parallel.For中,有时候对既有循环进行优化可能会是一个非常复杂的任务。Parallel.ForEach为固定数目的独立For Each循环迭代提供了负载均衡的并行执行,且支持自定义分区器,让使用者可以完全掌握数据分发。实质就是将所有要处理的数据区分为多个部分,然后并行运行这些串行循环。

修改上面的代码:

System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS +1), range =>{varaesM =newAesManaged();

Console.WriteLine("AES Range({0},{1} 循环开始时间:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay);for(inti = range.Item1; i < range.Item2; i++)

{

aesM.GenerateKey();byte[] result =aesM.Key;stringhexStr =ConverToHexString(result);

}

Console.WriteLine("AES:"+sw.Elapsed.ToString());

});

从执行结果可以看出,分了13个段执行的。

第二次执行还是13个段。速度上稍微有差异。开始没有指定分区数,Partitioner.Create使用的是内置默认值。

而且我们发现这些分区并不是同时执行的,大致是分了三个时间段执行。而且执行顺序是不同的。总的时间和Parallel.For的方法差不多。

publicstaticParallelLoopResult ForEach(Partitioner source, Action body)

Parallel.ForEach方法定义了source和Body两个参数。source是指分区器。提供了分解为多个分区的数据源。body是要调用的委托。它接受每一个已定义的分区作为参数。一共有20多个重载,在上面的例子中,分区的类型为Tuple,是一个二元组类型。此外,返回一个ParallelLoopResult的值。

Partitioner.Create 创建分区是根据逻辑内核数及其他因素决定。

publicstaticOrderablePartitioner> Create(intfromInclusive,inttoExclusive)

{intnum =3;if(toExclusive <=fromInclusive)thrownewArgumentOutOfRangeException("toExclusive");intrangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount *num);if(rangeSize ==0)

rangeSize=1;returnPartitioner.Create>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering);

}

因此我们可以修改分区数目,rangesize大致为250000左右。也就是说我的逻辑内核是4.

varrangesize= (int) (NUM_AES_KEYS/Environment.ProcessorCount) +1;

System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS +1,rangesize), range =>

再次执行:

分区变成了四个,时间上没有多大差别(第一个时间是串行时间)。我们看见这四个分区几乎是同时执行的。大部分情况下,TPL在幕后使用的负载均衡机制都是非常高效的,然而对分区的控制便于使用者对自己的工作负载进行分析,来改进整体的性能。

Parallel.ForEach也能对IEnumerable集合进行重构。Enumerable.Range生产了序列化的数目。但这样就没有上面的分区效果。

privatestaticvoidParallelForEachGenerateMD5HasHes()

{varsw =Stopwatch.StartNew();            System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number =>{varmd5M =MD5.Create();byte[] data = Encoding.Unicode.GetBytes(Environment.UserName +number);byte[] result =md5M.ComputeHash(data);stringhexString =ConverToHexString(result);

});

Console.WriteLine("MD5:"+sw.Elapsed.ToString());

}

六、从循环中退出

和串行运行中的break不同,ParallelLoopState 提供了两个方法用于停止Parallel.For 和 Parallel.ForEach的执行。

Break:让循环在执行了当前迭代后尽快停止执行。比如执行到100了,那么循环会处理掉所有小于100的迭代。

Stop:让循环尽快停止执行。如果执行到了100的迭代,那不能保证处理完所有小于100的迭代。

修改上面的方法:执行3秒后退出。

privatestaticvoidParallelLoopResult(ParallelLoopResult loopResult)

{stringtext;if(loopResult.IsCompleted)

{

text="循环完成";

}else{if(loopResult.LowestBreakIteration.HasValue)

{

text="Break终止";

}else{

text="Stop 终止";

}

}

Console.WriteLine(text);

}privatestaticvoidParallelForEachGenerateMD5HasHesBreak()

{varsw =Stopwatch.StartNew();varloopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS),(intnumber,ParallelLoopState loopState)=>{varmd5M =MD5.Create();byte[] data = Encoding.Unicode.GetBytes(Environment.UserName +number);byte[] result =md5M.ComputeHash(data);stringhexString =ConverToHexString(result);if(sw.Elapsed.Seconds >3){loopState.Stop();}            });            ParallelLoopResult(loopresult);            Console.WriteLine("MD5:"+sw.Elapsed);

}

七、捕捉并行循环中发生的异常。

当并行迭代中调用的委托抛出异常,这个异常没有在委托中被捕获到时,就会变成一组异常,新的System.AggregateException负责处理这一组异常。

privatestaticvoidParallelForEachGenerateMD5HasHesException()

{varsw =Stopwatch.StartNew();varloopresult =newParallelLoopResult();try{

loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) =>{varmd5M =MD5.Create();byte[] data = Encoding.Unicode.GetBytes(Environment.UserName +number);byte[] result =md5M.ComputeHash(data);stringhexString =ConverToHexString(result);if(sw.Elapsed.Seconds >3)

{thrownewTimeoutException("执行超过三秒");

}

});

}catch(AggregateException ex){foreach(varinnerExinex.InnerExceptions)

{

Console.WriteLine(innerEx.ToString());

}

}

ParallelLoopResult(loopresult);

Console.WriteLine("MD5:"+sw.Elapsed);

}

结果:

异常出现了好几次。

八、指定并行度。

TPL的方法总会试图利用所有可用的逻辑内核来实现最好的结果,但有时候你并不希望在并行循环中使用所有的内核。比如你需要留出一个不参与并行计算的内核,来创建能够响应用户的应用程序,而且这个内核需要帮助你运行代码中的其他部分。这个时候一种好的解决方法就是指定最大并行度。

这需要创建一个ParallelOptions的实例,设置MaxDegreeOfParallelism的值。

privatestaticvoidParallelMaxDegree(intmaxDegree)

{varparallelOptions =newParallelOptions();

parallelOptions.MaxDegreeOfParallelism=maxDegree;varsw =Stopwatch.StartNew();

System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS +1, parallelOptions, (inti) =>{varaesM =newAesManaged();

aesM.GenerateKey();byte[] result =aesM.Key;stringhexStr =ConverToHexString(result);

});

Console.WriteLine("AES:"+sw.Elapsed.ToString());

}

调用:如果在四核微处理器上运行,那么将使用3个内核。

ParallelMaxDegree(Environment.ProcessorCount -1);

时间上大致慢了0.2秒(第一次Parallel.For 3.18s)。

小结:这次学习了Parallel相关方法以及如何退出并行循环和捕获异常、设置并行度,还有并行相关的知识。园子里也有类似的博客。但作为自己知识的管理,在这里梳理一遍。

园友的博客:8天玩转并发

阅读书籍:《C#并行编程高级教程

诚邀喜欢看书,也喜欢分享书籍(不限技术书籍)的朋友加入书山有路群q:452450927 。大家推荐的书籍太多,喊你来读。

人的核心竞争力超过一半来自不紧不慢的事——读书、锻炼身体、与智者交友,以及业余爱好。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容