一、初识Partitioner
在认识Partitioner之前我们先来回顾一下MapReduce流程中,Map阶段的五个步骤。如下图所示:
我们可以通过上图看到step1.3就是一个Partition操作。其主要作用是计算应该将哪些Key放到同一个Reduce中去。其次从图上我们可以得知Partition的操作是基于map的输出结果的,而且分区操作的对象是key。
接下来让我们一起看看官方文档对这个Partition的解读吧。
该文档主要描述了:Partition的主类名为Class Partitioner<KEY,VALUE>其直属子类有BinaryPartitioner, HashPartitioner, KeyFieldBasedPartitioner, TotalOrderPartitioner这几个类。
并且说明了Partitioner直接采用了map的输出,其中分区的典型方式是使用hash函数进行分区将key相同的数据分为一组。上述的其子类 HashPartitioner就实现了这个功能。
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}//其中key和value为map输出,numReduceTasks是reduce的数量。
我们可以看到上面的代码中最关键的一行为:key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。那么为什么要用hashcode与上最大整形呢?这是因为如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上。
二、自定义Partitioner
现在我们有一下天气数据,我们需要按要求将每年的最高气温找出来。
1949-10-01 14:21:02 34C
1949-10-02 14:21:12 36C
1950-02-02 11:21:12 32C
1950-05-02 11:31:12 37C
1951-12-02 11:31:12 23C
1950-12-02 11:31:12 47C
1950-12-02 11:31:12 27C
1951-06-02 11:31:12 48C
1951-07-02 11:31:12 45C
如果需要将每年的最高气温找出,那我们就必然需要将气温数据按年分组。
public class wdPartitioner extends Partitioner<Text, Text>{
@Override public int getPartition(Text key, Text value, int numReduceTasks) {
String str = key.toString().split("-")[0];
int num = Integer.parseInt(str);
if(num == 1949){ return 1%numReduceTasks; }
else if(num == 1950) { return 2%numReduceTasks; }
else if(num == 1951){ return 3%numReduceTasks; }
return 1%numReduceTasks; }
}
可以看到这里由于年份只有这三年所以就按照年份来进行判断了,如果年份较多需要分成很多组那么就可以写成return (num.hashCode() & Integer.MAX_VALUE) % numReduceTasks;在处理不同的数据的时候我们可以继承不同的子类以方便我们运算。其子类用法请参考
总结:分区Partitioner主要作用在于以下两点
(1)根据业务需要,产生多个输出文件;
(2)多个reduce任务并发运行,提高整体job的运行效率;
(3)好的Partition可以有效的避免数据倾斜;