Spark Shuffle原理详解

1. shuffle原理

概述:Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点取拉取其他节点的map task记过。这一过程将会产生网络资源、内存、磁盘IO的消耗。

1.1 mapreduce的shuffle原理

1.1.1 map task端操作

每个map task都有一个内存缓冲区(默认是100MB),存储着map的输出结果,当缓冲区块满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉取数据。
Spill过程:这个从内存往磁盘写数据的过程被成为Spill(溢写)。整个缓冲区有个溢写的比例spill.percent(默认是0.8),当达到阈值时map task可以继续往剩余的memory写,同时溢写线程锁定已用memory,先对key(序列化的字节)做排序,若果client程序设置了Combiner,那么在溢写的过程中就会进行局部聚合。
Merge过程:每次溢写都会生成一个临时文件,在map task真正完成时会将这些文件归并成一个文件,这个过程叫做Merge。

1.1.2 reduce task端操作

当所有的map task执行完成,对应节点的reduce task开始启动,简单地说,此阶段就是不断拉取(Fetcher)每个map task所在节点的最终结果,然后不断地merge形成reduce task的输入文件。
Copy阶段:Reduce进程启动一些数据copy线程(Fetch)通过HTTP协议拉取map阶段输出文件。
Merge过程: Copy过来的数据会先放到内存缓冲区(基于JVM的heap size设置),如果内存缓冲区不足也会发生map task的spill(sort 默认, combine可选),多个溢写文件时也会发生map task的merge
关于排序方法:
在Map阶段,k-v溢写时,采用的正是快排;而溢出文件的合并使用的则是归并;在Reduce阶段,通过shuffle从Map获取的文件合并的时候采用的也是归并。

1.2 spark现在的SortShuffleManager

SortShuffleManager运行原理

SortShuffleManager运行机制主要分成两种:

  • 一种是普通运行机制
  • 另一种是bypass运行机制
    当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会起用bypass机制
普通运行机制

下图说明了普通的SortShuffleManager原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,如果是join就使用Array,Map是边聚合边写内存,array是直接写内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认batch的数量是10000条,也就是说,排序号的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream(缓冲输出流)实现的。首先会将数据缓冲在内存中,当内存缓冲满溢之后,再一次写入磁盘文件中。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最终会将之前所有的临时文件进行合并,这就是merge过程,一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge过程,因此大大减少了文件数量。


image.png
bypass运行机制

bypass运行机制触发条件:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)
  • 不是排序类的shuffle算子
    此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash后根据key的hash值,将key写如对应的磁盘文件中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件。最后同样会将所有的临时磁盘文件合并成一个磁盘文件,并创建一个单独的索引文件。
    该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能更高。
    该机制与普通SortShuffleManager运行机制不同在于:
  • 第一,磁盘写机制不同
  • 第二,不会进行排序
    也就是说,启用该机制的最大好处在于:shuffle write过程中,不需要进行数据的排序操作,也就是节省掉了这部分性能开销。
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容