默认情况下,mr只对key排序。我们所说的全排序,即对key的全排序。
1、使用一个reducer
这个是最容易想到的思路
优点是实现简单,
缺点也很明显,一个reduce有可能比较慢,无法利用分布式的优点
2、重写Partioner类。
通过重写Partition类,把key在一个范围内的发往一个固定的reduce,这样在一个reduce内key是全排序的,在reduce之间按照序号也是排好序的。比如key代表的是一个年龄。我们可以把数据输出到10个reduer。1-10岁之间发往第0个reduce,11-20发往第2个reduce,以此类推。
缺点是这种划分可能不均匀。
这里就需要关注Partition这个阶段,Partition阶段是针对每个Reduce,需要创建一个分区,然后把Map的输出结果映射到特定的分区中。这个分区中可能会有N个Key对应的数据,但是一个Key的所有数据只能在一个分区中。在实现全排序的过程中,如果只有一个reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,在一个reduce里可以根据compareTo(也可以采用其他比较算法)来排序,实现全排序。但是这种情况就让MapReduce失去了分布式计算的光环。
所以全排序的大概思路为:确保Partition之间是有序的就OK了,即保证Partition1的最大值小于Partition2的最小值就OK了。
即便这样做也还是有个问题:Partition的分布不均,可能导致某些Partition处理的数据量远大于其他Partition处理的数据量。
而实现全排序的核心步骤为:取样和Partition。
先“取样”,保证Partition得更均匀:
- 对Math.min(10, splits.length)个split(输入分片)进行随机取样,对每个split取10000个样,总共10万个样
- 10万个样排序,根据reducer的数量(n),取出间隔平均的n-1个样
- 将这个n-1个样写入partitionFile(_partition.lst,是一个SequenceFile),key是取的样,值是nullValue
- 将partitionFile写入DistributedCache
整个全排序的详细介绍可参照:http://www.iteye.com/topic/709986
3,、使用TotalOrderPartition
我们知道Mapreduce框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,hadoop 默认的partitioner是HashPartitioner,它依赖于output key的hashcode,使得相同key会去相同reducer,但是不保证全局有序,如果想要获得全局排序结果(比如获取top N, bottom N),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。
TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。
InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:
RandomSampler 随机取样
IntervalSampler 从s个split里面按照一定间隔取样,通常适用于有序数据
SplitSampler 从s个split中选取前n条记录取样