MapReduce中的Shuffle
在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。
Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。
下图描述了MapReduce算法的整个流程,其中shuffle phase是介于Map phase和Reduce phase之间:
在Hadoop, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件。
所以Hadoop后面直到reduce之前做的所有的事情其实就是不断的merge, 基于文件的多路并归排序,在map端的将相同partition的merge到一起, 在reduce端, 把从mapper端copy来的数据文件进行merge, 以用于最终的reduce
多路归并排序, 达到两个目的。
merge, 把相同key的value都放到一个arraylist里面;sort, 最终的结果是按key排序的。
这个方案扩展性很好, 面对大数据也没有问题, 当然问题在效率, 毕竟需要多次进行基于文件的多路归并排序,多轮的和磁盘进行数据读写。
Spark的Shuffle机制
Spark中的Shuffle是把一组无规则的数据尽量转换成一组具有一定规则的数据。
Spark计算模型是在分布式的环境下计算的,这就不可能在单进程空间中容纳所有的计算数据来进行计算,这样数据就按照Key进行分区,分配成一块一块的小分区,打散分布在集群的各个进程的内存空间中,并不是所有计算算子都满足于按照一种方式分区进行计算。
当需要对数据进行排序存储时,就有了重新按照一定的规则对数据重新分区的必要,Shuffle就是包裹在各种需要重分区的算子之下的一个对数据进行重新组合的过程。在逻辑上还可以这样理解:由于重新分区需要知道分区规则,而分区规则按照数据的Key通过映射函数(Hash或者Range等)进行划分,由数据确定出Key的过程就是Map过程,同时Map过程也可以做数据处理,例如,在Join算法中有一个很经典的算法叫Map Side Join,就是确定数据该放到哪个分区的逻辑定义阶段。Shuffle将数据进行收集分配到指定Reduce分区,Reduce阶段根据函数对相应的分区做Reduce所需的函数处理。
Spark中Shuffle的流程
- 首先每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。
- 其次Mapper产生的结果会根据设置的partition算法填充到每个bucket中去。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中去。
- 当Reducer启动时,它会根据自己task的id和所依赖的Mapper的id从远端或是本地的block manager中取得相应的bucket作为Reducer的输入进行处理。
这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。
转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页