MapReduce(五):Shuffle机制

Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

2.3 Shuffle机制.png

Partition分区

如何按照条件输出到不同文件(分区)中,MapReduce提供了Partitioner功能。默认采用hash值的方式。

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制那个key存储在那个分区。

自定义Partitioner步骤

1)自定义类继承Partitioner,重写getPartition()方法

public class ProvincePartitioner extends Partitioner<Text,FlowBean> {

   @Override
   public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
       String substring = text.toString().substring(0, 2);
       if("135".equals(substring)){
           return 0;
       }
       return 1;
   }
}

2)在Job驱动中,设置自定义Partitioner

        job.setPartitionerClass(ProvincePartitioner.class);

3)自定义Partition后,需要根据自定义Partitioner的逻辑设置相应数量的ReduceTask。

        job.setNumReduceTasks(2);

分区总结

1)如果ReduceTask数量>getPartition的结果数,则会多产生几个空的输出文件part-r-oooxx;

2)如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无法安放,会Exception

3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;

4)分区号必须从零开始,逐一累加。

代码实战

FlowBean.java

public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
            "\t" + downFlow +
            "\t" + sumFlow;
    }
}

FlowMapper.java

public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] split = line.split(" ");
        System.out.println(split.length);
        // 3 抓取数据
        String phone = split[0];
        String upFlow = split[split.length-3];
        String downFlow = split[split.length-2];
        // 4 封装
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(upFlow));
        outV.setDownFlow(Long.parseLong(downFlow));
        outV.setSumFlow();

        context.write(outK,outV);
    }
}

FlowReducer.java

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,
        Reducer<Text, FlowBean, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 遍历集合类价值
        long totalUp = 0;
        long totalDown = 0;
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }
        // 3 封装
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();
        // 4 写出
        context.write(key,outV);

    }
}

ProvincePartitioner.java

public class ProvincePartitioner extends Partitioner<Text,FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String substring = text.toString().substring(0, 2);
        if("135".equals(substring)){
            return 0;
        }
        return 1;
    }
}

FlowDriver.java

public class FlowDriver {

    public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置jar
        job.setJarByClass(FlowDriver.class);
        // 3 关联mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4 设置mapper输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5 设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(2);
        // 6 设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(System.getProperty("user.dir")+"/input/partitioner2"));
        FileOutputFormat.setOutputPath(job,new Path(System.getProperty("user.dir")+"/output/partitioner2"));
        // 7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

WritableComparable排序

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask均会对数据按照Key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

排序概述

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序(内存完成),并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

1)部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

2)全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

3)辅助排序

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不同)的key进入到同一个reduce方法时,可以采用分组排序。

4)二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序WritableComparable原理分析

bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

FlowBean.java

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
            "\t" + downFlow +
            "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        if (this.sumFlow > o.sumFlow) {
            return -1;
        } else if (this.sumFlow < o.sumFlow) {
            return 1;
        } else {
            if (this.upFlow > o.upFlow) {
                return 1;
            } else if (this.upFlow < o.upFlow) {
                return -1;
            } else {
                return 0;
            }
        }
    }
}

FlowMapper.java

public class FlowMapper extends Mapper<LongWritable, Text,FlowBean, Text> {
    private FlowBean  outK= new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, FlowBean, Text>.Context context)
        throws IOException, InterruptedException {
        // 获取1行
        String line = value.toString();
        // 切割
        String[] split = line.split(" ");
        // 封装
        outV.set(split[0]);
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        // 写出
        context.write(outK,outV);
    }
}

FlowReducer.java

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    private FlowBean outV = new FlowBean();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values,
        Reducer<FlowBean, Text, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }

    }
}

FlowDriver.java

public class FlowDriver {

    public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置jar
        job.setJarByClass(FlowDriver.class);
        // 3 关联mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4 设置mapper输出的key和value类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 5 设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6 设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writeableComparable"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writeableComparable"));
        // 7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Combiner

Combiner

1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

2)Combiner组件的父类就是Reducer。

3)Combiner和Reducer的区别在于运行的位置。

Combiner是在每一个MapTask所在的节点运行;

Reducer是接受全局所有Mapper的输出结果;

4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络流量。

5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv能够跟Reducer的输入kv类型要对应起来。

6)因为Combiner代码和Reducer代码一致,可以直接设置Reducer代码为Combiner代码

案例

WordCountMapper.java

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        // 1 获取一行
        String lineStr = value.toString();
        // 2 切割
        String[] words = lineStr.split(" ");
        // 3 循环写出
        for (String word : words) {
            // 封装outKey
            outKey.set(word);
            // 写出
            context.write(outKey, outV);
        }
    }
}

WordCountReducer.java

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable outV = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,
        Reducer<Text, IntWritable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        // 写出
        context.write(key,outV);
    }
}

WordCountCombiner.java

public class WordCountCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {

    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
        Reducer<Text, IntWritable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key,outV);

    }
}

WordCountDriver.java

public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        //1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        //3 关联mapper、reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4 设置mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setCombinerClass(WordCountCombiner.class);
        // 可以直接将Reducer设置为Combiner,因为这两处代码逻辑一致
        // job.setCombinerClass(WordCountReducer.class);
        //6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/combiner"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/combiner"));
        //7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

小结

本小节是重点!!!描述了Shuffle机制(在mapper之后reducer之前,如果没有reducer那么combiner将不执行)。详细描述了分区、排序以及聚合,多理解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容