引用:http://www.wangjialong.cc/2017/10/10/shuffle/
http://www.cnblogs.com/yangyquin/p/5021234.html
https://blog.csdn.net/ruidongliu/article/details/11690549
http://langyu.iteye.com/blog/992916
https://my.oschina.net/leejun2005/blog/268468
Mapareduce基础
MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。
当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。
总的来说:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。
Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过 Shuffle来获取数据。shuffle过程就是一个"GroupByKey"的过程,Shuffle 阶段负责把 map output 传递到 reduce 阶段。
Shuffle 阶段的功能是完完全全由 Hadoop framework 提供的,这里边没有任何用户的代码(即使我们有可能需要根据具体 Hadoop job 的特点配置一下这个阶段,但也非常方便)。
shuffle工作示意图
由上图(引用自shuffle和排序)所示,shuffle是一个横跨map task 和reduce task的过程,它是map 和 reduce的一个数据桥梁,负责将map输出作为输入传给reducer。在map端包括了写入缓冲区,溢出到磁盘,分区与排序等步骤;在reduce端则包括了复制数据、归并数据、reduce阶段等步骤。
map端
map函数产生输出时,出于效率的原因并不会直接写入硬盘,而是先放到一个环形内存缓存区中,并将缓存的数据按照key值进行一个预排序。缓冲区默认为100MB,该值可以通过io.sort.mb属性来调整。一旦达到阈值(io.sort.spill.percent, 默认为0.8),则后台进程开始将内容溢出到磁盘。溢出过程中,map输出仍写入缓冲区,在此期间缓冲区被填满,则会将map阻塞,知道该溢出过程结束。
每个溢出过程都会产生一个文件存到mapred.local.dir属性指定的目录中,在上图中,共产生了三个溢出到磁盘的文件。在溢出到磁盘之前,会根据reducer的数量划分分区,如图中共划分了3个分区,每个分区中,都按键进行内排序,如果指定了combiner,则在排序后的输出上进行combine,以减少溢写到磁盘和传递给reducer的数据。上图中每个溢出文件都有3个分区,每个分区内数据都是排好序的。
当map任务结束后,会将溢出到磁盘的文件进行一个合并merge,如图中,将3个文件合并成了一个文件,合并好的文件中每个分区内的数据也是排好序的。在map输出到磁盘时,可以通过设置mapred.compress.map.output为true和指定mapred.map.output.compression.codec指定压缩格式,这样可以加快溢出到磁盘的速度。
reducer是通过http方式获得输出文件的分区的,如上图中,第一个reducer获取了第一个分区。
reduce端
reducer会将各个map task上最后溢出的那个文件的对应分区复制到本地,由于map任务的完成时间可能不同,因此只要一个任务完成,reduce任务就开始复制其输出。reduce可以并行的复制map的输出,默认为5个线程,可以通过设置mapred.reduce.parallel.copies属性来改变。对于指定的作业,jobtracker(或App master)知道map输出和tasktracker的映射关系。reducer线程定期询问jobtracker以获取map输出的位置,直到获取全部的输出位置
复制完所有map输出后,就进入到上图中的“sort phase”,但它并不是一个严格意义上的排序过程,可以将它理解成归并排序中的merge过程,将若干个排好序的序列,归并成一个有序文件。这个过程根据合并因子(io.sort.factor设置,默认为10)进行,如果有50个map的输出,而合并因子为10,则每次最多合并10个map输出,因此最后会有5个中间文件。
在reduce阶段,直接将上面的5个中间文件合并成一个已排序的文件输入给reduce函数,最后的合并不需要磁盘的读写,只需要内存和磁盘片段的配合即可。
在“sort phase”时,由于最后一趟的合并总是将结果直接输入给reduce,而没有磁盘写入过程,因此可以据此进行优化。比如如果合并因子为10,有40个文件,此时不会再四趟中每次合并10个文件而得到4个文件,相反,第一趟只合并4个文件,随后的三趟每次合并10个文件,在最后一趟中,4个已合并的文件(4,10,10,10)和6个未合并的文件合并给reduce函数,这并不改变合并次数,却使得合并过程中磁盘只写入了4+10+10+10=34个文件,从而减少了磁盘的数据量。
关于是否压缩:
mapreduce.map.output.compress(default:false)设置为true进行压缩,数据会被压缩写入磁盘,读数据读的是压缩数据需要解压,在实际经验中Hive在Hadoop的运行的瓶颈一般都是IO而不是CPU,压缩一般可以10倍的减少IO操作,压缩的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一种比较平衡选择,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)参数设置。但这个过程会消耗CPU,适合IO瓶颈比较大。
分区补充:
在将map()函数处理后得到的(key,value)对写入到缓冲区之前,需要先进行分区操作,这样就能把map任务处理的结果发送给指定的reducer去执行
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}