spark性能优化五:Shuffle调优

1、概述

  • 1、什么样的情况下,会发生shuffle?
    在spark中,主要是以下几个算子:groupByKey、reduceByKey、countByKey、join等。

  • 2、shuffle是什么?
    是把分布在集群中各个节点上的数据中的同一个key对应的values,都给集中到同一个节点上,更严谨的说法是,集中到一个节点的一个executor中的一个task上。
    然后,集中一个key对应的values之后,才能进行后面的计算处理,<key, Iterable<value>>;
    reduceByKey:算子函数去对values集合进行reduce操作,最后变成一个value;
    countByKey:需在一个task中,获取到一个key对应的所有的value,然后进行计数,统计总共有多少个value;
    join:RDD<key,value> ,RDD<key,value>,只要是两个RDD中,key相同对应的2个value,都得到一个节点的executor的task中,给我们进行处理。

shuffle,一定是分为两个stage来完成的。因为这其实是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。
在某个action触发job的时候,DADScheduler会负责划分job为多个stage,划分的依据,就是如果发现会有触发shuffle操作的算子,比如reduceByKey,就将这个操作的前半部分以及之前所有的RDD和transformation操作,划分为一个stage;shuffle操作的后半部分,以及后面的,直到action为止的RDD和transformation操作,划分为另一个stage。(详细过程,见DAGScheduler的stage划分算法)

每个task(shuffle前半部分stage的task)都会创建下一个stage的task数量相同的文件,比如下一个stage会有100个task,那么当前stage每个task都会创建100份文件,会将同一个key对应的values,一定是写入同一个文件中的;不容节点上的task,也是同样的操作,也会创建100份文件。

shuffle的后半部分stage的task,每个task都会去从各个节点上的task写的属于自己的那一份文件中,拉取key value对;然后task会有一个内存缓冲区,会用HashMap进行key values的汇聚,(key,values);

之后,task会用我们自己定义的聚合函数,比如reduceByKey(+),把所有values进行一对一的累加,聚合出来的最终的值。就完成了shuffle。

shuffle前半部分的task在写入数据到磁盘文件之前,都会先写入一个一个的内存缓冲,内存缓冲溢满之后,再spill溢写到磁盘文件中。

2、shuffle调优之合并map端输出文件

new SparkConf().set("spark.shuffle.consolidateFiles", "true")
默认情况下,是不开启shuffle map端输出文件合并的机制,这时候就会发生大量map端输出文件的操作,严重影响性能,详细过程,见之前的shuffle原理;

合并map端输出文件,对spark性能的影响:

  • 1、map task写入磁盘文件的IO,减少 task数量/cpu core数量 倍;
  • 2、第二个stage,原本要拉取的第一个stage的数量文件,同样也减少 task数量/cpu core数量 倍;

shuffle中的写磁盘操作,基本上就是shuffle中性能消耗最为严重的部分;磁盘IO对性能和spark作业执行速度的影响,是及其惊人的,基本上,spark作业的性能,都消耗在shuffle中了,虽然不只是shuffle的map端输出文件这一个部分,但是这里也是非常大的一个性能消耗点。

最后,分享一下,实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果;
环境配置:100个节点,100个executor;每个executor有2个cpu core;总共1000个task,每个executor平均10个task;
spark作业,5个小时 -> 2~3个小时。性能的提升,还是相当的客观的。

3、shuffle调优之调节map端内存缓冲与reduce端内存占比

map端内存缓冲:spark.shuffle.file.buffer,默认32k
reduce端内存占比:spark.shuffle.memoryFraction,0.2
很多资料、网上视频,都会说,这两个参数,是调节shuffle性能的不二选择,很有效果的样子,实际上,不是这样的。
以实际的生产经验来说,这两个参数没有那么重要,往往来说,shuffle的性能不是因为这方面的原因导致的;
但是,有一点点效果的,这两个shuffle调优的小点,其实也是需要跟其他的大量的小点(broadcast,数据本地化等待时长等)配合起来使用,一点一点的提升性能,最终很多个性能调优的小点的效果,汇集在一起之后,那么就会有可以看见的还算不错的性能调优的效果。

image.png
  • 原理:
    默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区,默认是32k,每一次,当内存缓冲区溢满之后,才会进行spill操作,溢写到磁盘文件中去。

    reduce端task,在拉取到数据之后,会用hashMap的数据格式,来对各个key对应的values进行汇聚。
    reduce task,在进行汇聚、聚合等操作的时候,实际上,使用的就是自己对应的executor的内存,默认executor内存中划分给reduce task进行聚合的比例,是0.2;所以当拉取过来的数据很多时,内存中可能放不下,这时就会将数据溢写spill到磁盘文件中去;

  • 出现的问题
    默认,map端内存缓冲是每个task,32kb。
    默认,reduce端聚合内存比例,是0.2,也就是20%。

    如果map端的task,处理的数据量比较大,但是呢,你的内存缓冲大小是固定的。可能会出现什么样的情况?

    每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。

    在map task处理的数据量比较大的情况下,而你的task的内存缓冲默认是比较小的,32kb。可能会造成多次的map端往磁盘文件的spill溢写操作,发生大量的磁盘IO,从而降低性能。

    reduce端聚合内存,占比。默认是0.2。如果数据量比较大,reduce task拉取过来的数据很多,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操作,溢写到磁盘上去。而且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操作的时候,很可能会多次读取磁盘中的数据,进行聚合。

    默认不调优,在数据量比较大的情况下,可能频繁地发生reduce端的磁盘文件的读写。

  • 调优:
    这两个点之所以放在一起调优,是因为他们俩是有关联的。数据量变大,map端肯定会出点问题;reduce端肯定也会出点问题;出的问题是一样的,都是磁盘IO频繁,变多,影响性能。

    调节map task内存缓冲:spark.shuffle.file.buffer,默认32k
    调节reduce端聚合内存占比:spark.shuffle.memoryFraction,0.2

    在实际生产环境中,我们在什么时候来调节两个参数?

    看Spark UI,如果采用standalone模式,那么狠简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,可以看到,你的每个stage的详情,有哪些executor,有哪些task,每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量;如果是用的yarn模式来提交,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

    如果发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

    调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。

    不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。

4、shuffle调优之ShuffleManager

spark.shuffle.manager:hash(最老)、sort(1.2版本引入)、tungsten-sort(钨丝,1.5引入)

这三个ShuffleManager之间的区别:

  • 1、SortShuffleManager会对每个reduce task要处理的数据进行排序(默认的)
  • 2、SortShuffleManager会避免像HashShuffleManager那样,默认就去创建多份磁盘文件,一个task,只会写入一个磁盘文件,不同reduce task的数据,用offset来划分界定
  • 3、tungsten-sort shuffleManager,官网上说,钨丝sort shuffle manager,效果跟sort shuffle manager是差不多的。但是,唯一的不同之处在于,钨丝manager,是使用了自己实现的一套内存管理机制,性能上有很大的提升, 而且可以避免shuffle过程中产生的大量的OOM,GC,等等内存相关的异常。

注意,SortShuffleManager的使用的时候,spark.shuffle.sort.bypassMergeThreshold(默认为200),只有reduce task数量大于这个阈值,sort排序才能生效。就是当reduce task数量少于等于200,map创建的输出文件小于等于200时,最后会将所有的输出文件合并为一份文件,但是不会进行sort排序,这样做的好处,就是避免了sort排序,节省了性能开销。而且还能将多个reduce task的文件合并成一份文件。节省了reduce task拉取数据的时候的磁盘IO的开销。

那么hash、sort、tungsten-sort这三种shuffleManager如何选择呢?

  • 1、需不需要数据默认就让spark给你进行排序?就好像mapreduce,默认就是有按照key的排序。如果不需要的话,其实还是建议搭建就使用最基本的HashShuffleManager,因为最开始就是考虑的是不排序,换取高性能;
  • 2、什么时候需要用sort shuffle manager?如果你需要你的那些数据按key排序了,那么就选择这种吧,而且要注意,reduce task的数量应该是不超过200的,这样sort、merge(多个文件合并成一个)的机制,才能生效。但是这里要注意,你一定要自己考量一下,有没有必要在shuffle的过程中,就做这个事情,毕竟对性能是有影响的。
  • 3、如果你不需要排序,而且你希望你的每个task输出的文件最终是会合并成一份的,你自己认为可以减少性能开销;可以去调节bypassMergeThreshold这个阈值,比如你的reduce task数量是500,默认阈值是200,所以默认还是会进行sort和直接merge的;可以将阈值调节成550,不会进行sort,按照hash的做法,每个reduce task创建一份输出文件,最后合并成一份文件。(一定要提醒大家,这个参数,其实我们通常不会在生产环境里去使用,也没有经过验证说,这样的方式,到底有多少性能的提升)
  • 4、如果你想选用sort based shuffle manager,而且你们公司的spark版本比较高,是1.5.x版本的,那么可以考虑去尝试使用tungsten-sort shuffle manager。看看性能的提升与稳定性怎么样。
spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")

// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")

最后总结:
1、在生产环境中,不建议大家贸然使用第三点和第四点:
2、如果你不想要你的数据在shuffle时排序,那么就自己设置一下,用hash shuffle manager。
3、如果你的确是需要你的数据在shuffle时进行排序的,那么就默认不用动,默认就是sort shuffle manager;如果你压根儿不care是否排序这个事儿,那么就默认让他就是sort的。调节一些其他的参数(consolidation机制)。(80%,都是用这种)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容