【前言】Combiners和Partitioner都是mapperReduce编程中mapper和reduce的中间步骤,他们的出现给MR计算的效率以及业务功能有很大的提高
Combiners编程的作用:
首先,Combiners编程其实本质上就是一个Reduce,只不过它特殊在其实map阶段的reduce。它主要活动在mapper之后和reduce之前,主要将mapper产生的大量输出提前先做一次合并或者过滤,以减少传输到reduce的数据量,从而坚强reduce的压力,提高效率。
如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输入时mapper的输出,Combiner 的输出是reduce的输入。Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
Combiner编程也可以对数据进行过滤,以达到节省网路的传输提高效率
代码
//继承reduce类,实现reduce方法
public class WCCombiners extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
//define a counter
long counter = 0;
//loop
for(LongWritable l : values){
counter += l.get();
}
//write
context.write(key, new LongWritable(counter));
}
}
Partitioner编程的作用:
将mapper(如果使用了combiner的话就是combiner)输出的key/value拆分为分片(shard),每个reducer对应一个分片。默认情况下,MR调用Hashpartitioner类,如果程序员编写了自己的partition类,那么就使用自己编写的partition编程进行数据分,以达到map阶段的数据分区切片,从而防止reduce阶段的数据倾斜问题,实现负载均衡。
hashpartition的算法(先计算key的散列值(通常为md5值)。然后通过reducer个数执行取模运算:key.hashCode%(reducer个数)。这种方式不仅能够随机地将整个key空间平均分发给每个reducer,同时也能确保不同mapper产生的相同key能被分发到同一个reducer。)
目的
如果对数据的整体有很好的了解,可以使用自定义Partitioner来达到reducer的负载均衡,提高效率。
使用范围
必须提前知道有多少个分区。一般设置的分区数量要比实际需要的分区数量大,否则会报错,当然最好相等。
注意
在自定义partitioner时一定要注意防止数据倾斜。
代码
//partitioner分区,继承Partitioner复写getPartition方法
public static class ProviderPartitioner extends Partitioner<Text, DataBean> {
private static Map<String,Integer> providermap = new HashMap<String,Integer>();
static{
/**
* 假如我们要把电话号码用运营商来分开
* 1:联通
* 2:电信
* 3:移动
*
* 在真实项目中,这里可以看成查数据库
*/
providermap.put("135", 1);
providermap.put("136", 1);
providermap.put("137", 1);
providermap.put("138", 1);
providermap.put("139", 1);
providermap.put("150", 2);
providermap.put("159", 2);
providermap.put("182", 3);
providermap.put("183", 3);
}
//Partitioner编程的输入参数是map的输出,因为它在map与reduce之间
@Override
public int getPartition(Text key, DataBean value, int numPartitions) {
String account = key.toString();
String sub_acc = account.substring(0,3);
Integer code = providermap.get(sub_acc);
//如果不是三家运营商,则code设置为0 表示其他
if(code == null){
code = 0;
}
return code;
}
}