Shuffle
Shuffle描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。
MapReduce Shuffle
Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。
Shuffle阶段有两种阀值设置。第一,获取来自map的结果数据的时候,根据数据大小(file.out的大小)自然划分到内存或者是磁盘;第二,内存和磁盘能够保存的文件数目有阀值,超出阀值,会对文件进行merge操作,即小文件合并成为大文件。Shuffle过程:
1)获取完成的Map Task列表。
2)进行数据的远程拷贝(http get的方法),根据数据文件的大小自然划分到内存或者是磁盘。
3)当内存或者磁盘的文件较多时,进行文件合并。
Spark Shuffle
Shuffle在Spark中即是把父RDD中的K-V对按照Key进行重新分区,从而得到一个新的RDD。也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分区。
Spark的Shuffle主要是两个阶段:Shuffle Write和Shuffle Fetch
-
Shuffle Write
- 将各个节点数据写入到指定分区
1、根据下一个Stage分区数分成相应的Bucket 2、将Bucket写入磁盘
- 将各个节点数据写入到指定分区
-
Shuffle Fetch
- 获取各个分区发送的数据
1、在存储有Shuffle数据节点的磁盘Fetch需要的数据 2、Fetch到本地之后进行自定义的聚集函数操作
- 获取各个分区发送的数据
需要注意的是reducer进行数据的fetch操作是等到所有的ShuffleMapTask执行完才开始进行的,因为所有的ShuffleMapTask可能不在同一个stage里面,而stage执行后提交是要在父stage执行提交之后才能进行的,所以fetch操作并不是FileSegment产生就执行的。fetch一旦开始,就会边fetch边处理(reduce)。MapReduce shuffle阶段就是边fetch边使用combine()进行处理,但是combine()处理的是部分数据。MapReduce不能做到边fetch边reduce处理,因为MapReduce为了让进入reduce()的records有序,必须等到全部数据都shuffle-sort后再开始reduce()。然而,Spark不要求shuffle后的数据全局有序,因此没必要等到全部数据shuffle完成后再处理。
注:
1. Shuffle的中间结果会被写入到磁盘
2.默认情况下Shuffle并不会更改分区的数量
3.Hadoop的shuffle过程是明显的几个阶段:map(),spill,merge,shuffle,sort,reduce()等,是按照流程顺次执行的,属于push类型;但是,Spark不一样,因为Spark的Shuffle过程是算子驱动的,具有懒执行的特点,属于pull类型。
MapReduce Shuffle 与Spark Shuffle对比