由于Shuffle涉及到了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响到了整个程序的运行效率。
1. MapReduce Shuffle
- Hadoop的核心思想是MapReduce,Shuffle是MapReduce的核心。
- Shuffle的主要工作是从Map结束到Reduce开始之间的过程。
- Shuffle阶段又可以分为Map端的Shuffle和Reduce端的Shuffle。
- MapReduce的shuffle过程中在Map端会进行一个Sort,也会在Reduce端对Map的结果在进行一次排序。
注意:这个有序是指Key值有序,对于value依旧是无序的,如果想对value进行排序,需要借鉴二次排序的算法。
二次排序的理论是利用MR的全局排序的功能,将value和key值合并,作为一个新的Key值,然后由MR的机制进行Key的排序,这个方法类似于在处理数据倾斜的时候在Key值上加随机数的方法。
- MapReduce的shuffle有一个很重要的特点:全局排序。
1.1 Map端的Shuffle

- 频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”
- 在写入的过程中进行分区(partition),也就是对于每个键值对来说,都增加了一个partition属性值,然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)
- 当写入的数据量达到预先设置的阈值后便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操作)
- 当整个map任务完成溢出写后,会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。
- 最终生成的文件也会存储在TaskTracker能够访问的位置。每个reduce task不间断的通过RPC从JobTracker那里获取map task是否完成的信息,如果得到的信息是map task已经完成,那么Shuffle的后半段开始启动。
1.2 Reduce端的Shuffle
Reduce端的shuffle主要包括三个阶段,copy、merge和reduce。

每个reduce task负责处理一个分区的文件,以下是reduce task的处理流程:
- reduce task从每个map task的结果文件中拉取对应分区的数据。因为数据在map阶段已经是分好区了,并且会有一个额外的索引文件记录每个分区的起始偏移量。所以reduce task取数的时候直接根据偏移量去拉取数据就ok。
- reduce task从每个map task拉取分区数据的时候会进行再次合并,排序,按照自定义的reducer的逻辑代码去处理。
- 最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。
1.3 为什么要排序
- key存在combine操作,排序之后相同的key放到一块显然方便做合并操作。
- reduce task是按key去处理数据的。 如果没有排序那必须从所有数据中把当前相同key的所有value数据拿出来,然后进行reduce逻辑处理。显然每个key到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key对于的value集合。
- reduce task按key去处理数据时,如果key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key?那么就标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key处理完了,哪些key没有处理完。
1.4 为什么要合并文件
- 因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据(不合并将导致小文件问题)。
- 任务去读取文件的数增多,打开的文件句柄数也会增多。
- mapreduce是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。???是吗???
2. Spark Shuffle
Spark的Shuffle是在MapReduce Shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在Spark中Shuffle write相当于MapReduce 的map,Shuffle read相当于MapReduce 的reduce。
宽依赖之间会划分stage,而Stage之间就是Shuffle
- 宽依赖:父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区
- 窄依赖:父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区
- 区分宽窄依赖的原因:
- 窄依赖允许在一个集群节点上以流水线的方式计算所有父分区,
而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。 - 窄依赖能够更有效地进行失效节点的恢复,即只需要重新计算丢失分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
在Spark中,ShuffleManager(Shuffle管理器)负责shuffle过程的执行、计算和处理。
ShuffleManager有2种实现方式:HashShuffleManager和SortShuffleManager
- 窄依赖允许在一个集群节点上以流水线的方式计算所有父分区,
2.1 Hash Shuffle
SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
2.2 Sort Shuffle
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
-
普通机制的Sort Shuffle
普通机制的Sort Shuffle
- 数据会先写入一个内存数据结构中
- 每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
- 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。
- 一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,由于一个task就只对应一个磁盘文件因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。- bypass机制的Sort Shuffle
bypass运行机制的触发条件如下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认值200。
2)不是聚合类的shuffle算子(比如reduceByKey)。
而该机制与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
bypass机制的Sort Shuffle
- bypass机制的Sort Shuffle
SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
Spark与MapReduce Shuffle的异同
- 从整体功能上看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作)。
- 从流程的上看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。以前 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey的操作。在Spark 1.2之后,sort-based变为默认的Shuffle实现。
- 从流程实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map, spill, merge, shuffle, sort, reduce等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蕴含在 transformation中。

