4.MapReduce框架原理2 - shuffle combiner

1.Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。


image.png

2.Shuffle排序机制

  • hadoop排序是使用WritableComparator对象
  • 实现排序的方法:
  • 1.直接让参与对比的对象实现WritableComparable 接口,并指定泛型,实现compareTo方法,实现比较规则
  • 2.自定义一个比较器对象,需要继承WritableComparator类,重写compare的方法。注意在构造器中调用父类对当前要参与比较的对象进行实例化。当前要参与比较的对象必须要实现WritableComparable接口,最后在driver类中指定自定义的比较器对象
//自定义的比较器对象
public class FlowBeanWritableComparator extends WritableComparator {

    // 指定当前自定义的比较器对象为谁服务
   // 注意在构造器中调用父类对当前要参与比较的对象进行实例化。
    public FlowBeanWritableComparator() {
        super(FlowBean.class, true);
    }

    /**
     * 自定义比较规则
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        FlowBean abean = (FlowBean) a;
        FlowBean bbean = (FlowBean) b;
        System.out.println("aa"+abean);
        System.out.println("bb"+bbean);
        return -abean.getSumFlow().compareTo(((FlowBean) b).getSumFlow());
    }
}


// 指定自定义的比较器对象
 job.setSortComparatorClass(FlowBeanWritableComparator.class);

3.Shuffle排序源码分析

 // 为当前Job中的对象获取比较器对象
    comparator = job.getOutputKeyComparator();
    // 获取比较器对象的核心逻辑
    public RawComparator getOutputKeyComparator() {
    // 在当前Job中获取比较器对象的class文件--> mapreduce.job.output.key.comparator.class
    Class<? extends RawComparator> theClass = getClass(
    JobContext.KEY_COMPARATOR, null, RawComparator.class);
    // 如果通过JobContext.KEY_COMPARATOR 获取到比较器对象
    if (theClass != null){
         return ReflectionUtils.newInstance(theClass, this);
    }
     // 如果通过JobContext.KEY_COMPARATOR 获取不到比较器对象
     // Hadoop 会默认获取比较器对象 通过调用WritableComparator对象的get方法获取,
     // 在获取之前有个前提 判断当前job的MapOutputKeyClass 是否实现了WritableComparable接口
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
            }
            

4.hadoop如何给自身的数据类型获取比较器

1). 自身的数据类型已经实现WritableComparable接口
2). 自身的数据类型对象中 已经通过构造函数创建比较器对象

 // 以Text为例
   public static class Comparator extends WritableComparator {
    public Comparator() {
    super(Text.class);
    }

3). 自身的数据类型对象中 通过静态代码块把 当前对象的class 和 它的比较器对象
放入一个Map进行了维护。

static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
}
                  
 public static void define(Class c, WritableComparator comparator) {
    comparators.put(c, comparator);
}

5.Shuffle的combiner流程使用和注意事项

概念:是Shuffle过程中的一个可选流程(优化手段)
可以为Map阶段计算完的数据进行提前汇总,主要考虑到 减少 从Map阶段到
Reduce阶段的数据传输的大小控制以及减少Reduce端的计算压力。
使用场景:当不考虑多个MapTask的整体数据关联关系的时候才使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • InputFormat数据输入 切片与MapTask并行度决定机制 问题引出MapTask的并行度决定Map阶段的...
    Groundhog阅读 211评论 0 0
  • 前言 这里放一个我学习MapReduce的编程实例项目吧,本来是想把这些分开写成多篇文章的,能够详细叙述我学习过程...
    josonLe阅读 3,125评论 0 3
  • MapReduce应用 二次排序 二次排序的需求说明 在mapreduce操作时,shuffle阶段会多次根据ke...
    依天立业阅读 723评论 0 0
  • 3.1 MapReduce工作流程 1)流程示意图 2)流程详解 上面的流程是整个mapreduce最全工作流程,...
    码农GG阅读 267评论 0 0
  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,098评论 0 4