MapReduce框架原理

image.png

InputFormat数据输入

切片与MapTask并行度决定机制

  1. 问题引出
    MapTask的并行度决定Map阶段的任务处理并发度,从而影响整个job的处理速度
    思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是不是越多越好呢?哪些因素影响了MapTask的并行度?

  2. MapTask并行度决定机制
    数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS上数据存储单位
    数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

image.png

job提交流程源码和切片源码解析

  1. job提交流程源码
        waitForCompletion()

        submit();

        // 1建立连接
        connect();
        // 1)创建提交Job的代理
        new Cluster(getConfiguration());
        // (1)判断是本地运行环境还是yarn集群运行环境
        initialize(jobTrackAddr, conf);

        // 2 提交job
        submitter.submitJobInternal(Job.this, cluster)
        // 1)创建给集群提交数据的Stag路径
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

        // 2)获取jobid ,并创建Job路径
        JobID jobId = submitClient.getNewJobID();

        // 3)拷贝jar包到集群
        copyAndConfigureFiles(job, submitJobDir);
        rUploader.uploadFiles(job, jobSubmitDir);

        // 4)计算切片,生成切片规划文件
        writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

        // 5)向Stag路径写XML配置文件
        writeConf(conf, submitJobFile);
        conf.writeXml(out);

        // 6)提交Job,返回提交状态
        status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
image.png

FileInputFormat切片源码解析

  1. 程序先找到数据存储的目录
  2. 开始遍历处理(规划切片)目录下的每一个文件
  3. 遍历第一个文件
    (3.1)获取文件大小,fs.sizeOf()
    (3.2)计算切片大小
    computeSplitSize(Math.max(minSize, Math.min(blockSize, maxSize))) = blockSize = 128M
    (3.3)默认情况下,切片大小为blockSize
    (3.4)开始切,形成第一个切片,0——128M,第二个切片,129——256M,第三个切片,256——300M
    每次切片时,要判断切完剩下的部分是否大于块的1.1倍,如果不大于,那么就切分为一块
    (3.5)将切片信息写到一个切片规划文件里
    (3.6)整个切片核心过程在getSplit()方法中完成
    (3.7)InputSplit只记录了切片的元数据信息,比如起始位置,长度,以及所在的节点列表等
  4. 提交切面规划文件到YARN上,YARN的MrAppMaster就可以根据切片规划文件计算开启几个MapTask了

FileInputFormat切片机制

  1. 切片机制
    (1)简单的按照文件的内容长度切分
    (2)切片大小,默认等于Block的大小
    (3)切片时不考虑数据集整体,而是针对每一个文件进行切片
  2. 案例分析


    image.png

FileInputFormat切片大小的参数配置

  1. 源码中计算切片大小的公式
    Math.max(minSize, Math.min(maxSize, blockSize));
    mapreduce.input.fileinputformat.split.minsize = 1
    mapreduce.input.fileinputformat.split.maxsize = Long.MAX_VALUE
    因此,默认情况下,切片大小为blockSize

  2. 切片大小设置
    maxSize(切片最大值):参数如果调的比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
    minSize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize大

  3. 获取切片信息API
    // 获取切片的文件名称
    String name = inputSplit.getPath().getName();
    // 根据文件类型获取切片信息
    FileSplit inputSplit = (FileSplit)context.getInputSplit();

TextInputFormat

  1. FileInputFormat实现类
    思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
    FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineIntpuFormat、CombineTextInputFormat和自定义的InputFormat

  2. TextInputFormat
    TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(回车、换行),Text类型

CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量的小文件,就会产生大量的MapTask,处理效率极其低下

  1. 应用场景
    CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
  2. 虚拟存储切片最大值设置
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
  3. 切片机制
    生成切片过程包括:虚拟存储过程和切片过程两部分


    image.png

(1)虚拟存储过程:
将输入目录下所有文件大小,一次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值两倍,此时将文件均分为2个虚拟存储块(防止出现太小切片)
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小虚拟存储块,所以将剩余的4.02M切分成2.01M和2.01M的两个文件

(2)切片过程:
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片
(c)测试举例:有4个小文件,大小分别为:1.7M、5.1M、3.4M以及6.8M,这四个小文件,则虚拟存储之后会形成6个文件块,大小分别为1.7M、(2.55M、2.55M)、3.4M、(3.4M、3.4M),最终形成3个切片,大小分别是(1.7+2.55)M、(2.55+3.4)M、(3.4+3.4)M

MapReduce工作流程

image.png
image.png

Shuffle机制

Shuffle机制

Map之后,Reduce之前的数据处理过程称为Shuffle


image.png

Partition分区

  1. 问题引出
    要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  2. 默认Partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

默认分区是根据key的hashCode()对ReduceTasks个数取模得到的,用户没法控制哪个key存储到哪个分区

  1. 自定义Partitioner步骤
    (1)自定义类继承Partitioner,重写getPartition()方法
public class ClustomPartitioner extends Partitioner<Text, FlowBean> {
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions){
                // 控制分区代码逻辑
                return partition;
        }
}

(2)在job驱动中,设置自定义Partitioner
job.setPartitionerClass(ClustomPartitioner.class);
(3)自定义Partitioner后,要根据自定义的Partitioner的逻辑设置响应数量的ReduceTask
job.setReduceTasks(5);

Partition分区案例实操

  1. 需求
    将统计结果按照手机归属地不同省份输出到不同文件中
    (1)输入数据
1   13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
2   13846544121 192.196.100.2           264 0   200
3   13956435636 192.196.100.3           132 1512    200
4   13966251146 192.168.100.1           240 0   404
5   18271575951 192.168.100.2   www.atguigu.com 1527    2106    200
6   84188413    192.168.100.3   www.atguigu.com 4116    1432    200
7   13590439668 192.168.100.4           1116    954 200
8   15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9   13729199489 192.168.100.6           240 0   200
10  13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11  15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12  15959002129 192.168.100.9   www.atguigu.com 1938    180 500
13  13560439638 192.168.100.10          918 4938    200
14  13470253144 192.168.100.11          180 180 200
15  13682846555 192.168.100.12  www.qq.com  1938    2910    200
16  13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17  13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18  18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19  13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20  13768778790 192.168.100.17          120 120 200
21  13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22  13568436656 192.168.100.19          1116    954 200

(2)期望输出数据
手机号136、137、138、139开头都分别放到一个独立的文件中,其他开头的放到一个文件中

  1. 分析


    image.png
  2. 编码
    Partitioner

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        String phone = key.toString();
        String prePhone = phone.substring(0, 3);

        int partition;
        switch (prePhone){
            case "136":
                partition = 0;
                break;
            case "137":
                partition = 1;
                break;
            case "138":
                partition = 2;
                break;
            case "139":
                partition = 3;
                break;
            default:
                partition = 4;
        }

        return partition;
    }
}

修改Driver

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置jar路径
        job.setJarByClass(FlowCountDriver.class);
        // 关联mapper、reducer
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 设置map阶段输出的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 设置最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 绑定Partitioner
        job.setPartitionerClass(ProvincePartitioner.class);
        // 设置reduce任务数,根据自定义Partitioner确定
        job.setNumReduceTasks(5);
        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("data/phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("phone_result2"));
        // 提交job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}
  1. 分区总结
    (1)如果ReduceTask的数量 > getPartition()的结果数,则会多产生几个空的输出文件
    (2)如果1 < ReduceTask < getPartition()的结果数,则有一部分分区数据无处安放,抛异常
    (3)如果ReduceTask的数量 = 1,则不管MapTask段输出多少个分区文件,最终结果都会交给这一个ReduceTask,最终也就只会产生一个结果文件
    (4)分区号必须从0开始,逐一累加

  2. 案例分析
    例如:自定义分区数为5,则
    (1)job.setNumReduceTasks(1); 正常运行,只产生一个输出文件
    (2)job.setNumReduceTasks(2); 抛异常
    (3)job.setNumReduceTasks(5); 与预期一样
    (4)job.setNumReduceTasks(6); 程序正常运行,但会产生一个空文件

WritableComparable排序

概述

排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要
默认排序是按照字段顺序排序,且实现该排序的方法是快速排序

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,在对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,他会对磁盘上所有文件进行归并排序
对于ReduceTask,他从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中,如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask同意对内存和磁盘上的所有数据进行一次归并排序。

分类排序
  1. 部分排序
    MapReduce根据输入记录的键对数据排序。保证输出的每个文件内部有序
  2. 全排序
    最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
  3. 辅助排序
    在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段不同)的key进入到同一个reduce方法,可以采用分组排序
  4. 二次排序
    在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序
自定义WritableComparable原理分析

bean对象作为key传输,需要实现WriableComparable接口,重写compareTo方法,就可以实现排序

@Override
public int compareTo(FlowBean bean) {
        int result;
        if(this.sumFlow > bean.getSumFlow()) {
                result = -1;
        } else if(this.sumFlow < bean.getSumFlow()) {
                result = 1;
        } else {
                result = 0;
        }

        return resule;
}

WritableComparable排序案例实操(全排序)

  1. 需求
    根据手机号的案例,将结果根据总流量倒序排序

  2. 需求分析


    image.png
  3. 编码
    FlowBean
    在原有基础上,改为实现WriableComparable接口,并重写compareTo方法

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 空参构造
    public FlowBean() {
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean bean) {
        int result;

        if (this.sumFlow > bean.getSumFlow()) {
            result = -1;
        } else if(this.sumFlow < bean.getSumFlow()) {
            result = 1;
        } else {
            result = 0;
        }

        return result;
    }
}

Mapper

public class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private Text outValue = new Text();
    private FlowBean outKey = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取一行
        String line = value.toString();

        // 切分
        String[] split = line.split("\t");

        // 封装
        String phone = split[0];
        outValue.set(phone);

        long up = Long.parseLong(split[1]);
        long down = Long.parseLong(split[2]);
        long sum = Long.parseLong(split[3]);

        outKey.setUpFlow(up);
        outKey.setDownFlow(down);
        outKey.setSumFlow(sum);

        // 写出
        context.write(outKey, outValue);
    }
}

Reducer

public class FlowCountReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    private FlowBean outValue = new FlowBean();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 循环遍历,防止不同手机号流量相同,循环过程中写出
        for (Text value : values) {
            context.write(value, key);
        }
    }
}

Driver
注意修改Mapper的输出类型,并将之前案例的输出作为输入

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置jar路径
        job.setJarByClass(FlowCountDriver.class);
        // 关联mapper、reducer
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 设置map阶段输出的类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 设置最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("phone_result"));
        FileOutputFormat.setOutputPath(job, new Path("phone_compare_result"));
        // 提交job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

Combiner合并

  1. Combiner是MapReduce程序中Mapper和Reducer之外的一种组件

  2. Combiner组件的父类就是Reducer

  3. Combiner和Reducer的区别在于运行的位置
    Combiner是在每一个MapTask所在的节点运行
    Reducer是接收全局所有Mapper的输出结果

  4. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量

  5. Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出KV应该和Reducer的输入KV类型对应起来

  6. 自定义Combiner实现步骤
    (1)自定义一个Combiner继承Reducer,重写reduce方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                        sum += value.get();
                }

                outValue.set(sum);

                context.write(key, outValue);
        }
}

(2)在驱动类中进行设置
job.setCombinerClass(WordCountCombiner.class);

另一种实现方式:
我们发现新编写的Combiner和原始的Reducer内容一致,而Combiner和Reducer都继承自Reducer,所以我们可以将原有的Reducer作为Combiner使用
job.setCombinerClass(WordCountReducer.class)

OutputFormat数据输出

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputForamt实现类

  1. OutputFormat实现类


    image.png
  1. 默认输出格式TextOutputFormat
  2. 自定义OutputFormat
    应用场景:输出数据到MySQL/HBASE/ElasticSearch等存储中
    步骤:
    (1) 自定义一个类继承OutputFormat
    (2)改写RecordWriter,具体改写输出数据的方法write()

自定义OutputFormat案例

  1. 需求
    过滤输入的log日志,包含atguigu的网站输出到atguigu.log,不包含的输出的other.log
    数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.taobao.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
  1. 需求分析


    image.png
  2. 编码
    Mapper

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 不作任何处理
        context.write(value, NullWritable.get());
    }
}

Reducer

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 相同key的数据会被聚合,循环写出,防止丢失
        for (NullWritable value : values) {
            context.write(key, value);
        }
    }
}

OutputFormat

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;
    }
}

RecordWriter

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        // 创建两条流
        try {
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());

            atguiguOut = fileSystem.create(new Path("log/atguigu.log"));
            otherOut = fileSystem.create(new Path("log/other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 具体怎么写
        String line = key.toString();
        if (line.contains("atguigu")) {
            atguiguOut.writeBytes(line + "\n");
        } else {
            otherOut.writeBytes(line + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStreams(atguiguOut, otherOut);
    }
}

Driver

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置jar路径
        job.setJarByClass(LogDriver.class);
        // 关联mapper、reducer
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        // 设置map输出的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 设置最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 配置自定义OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);
        // 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("data/log.txt"));
        /*
        虽然我们设置了自定义的OutputFormat,但是自定义的OutputFormat继承自FileOutputFormat
        而FileOutputFormat要输出一个_SUCCESS文件,所以还是要指定输出路径
         */
        FileOutputFormat.setOutputPath(job, new Path("log"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

MapReduce内核源码解析

MapTask工作机制

image.png
  1. Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
  2. Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
  3. Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

ReduceTask工作机制

image.png
  1. Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
  2. Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  3. Reduce阶段:reduce()函数将计算结果写到HDFS上

ReduceTask并行度决定机制

回顾:MapTask并行度有切片个数决定,切片个数由输入文件和切片规则决定。
思考:ReduceTask并行度由谁决定

  1. 设置ReduceTask并行度个数
    ReduceTask的并行度同样影响整个Job的执行并发读和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:
// 默认值为1,手动设置为4
job.setNumReduceTasks(4);
  1. 实验:测试ReduceTask多少合适
    (1)实验环境
    1个Master节点,16个Slave节点,CPU:8GHZ,内存2G
    (2)实验结果


    image.png
  2. 注意事项
    (1)ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。
    (2)ReduceTask默认值就是1,所以输出文件个数为1
    (3)如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
    (4)ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个ReduceTask
    (5)具体多少个ReduceTask,需要根据集群性能而定
    (6)如果分区数不是1,但是ReduceTask为1,不执行分区过程,因为在MapTask源码中,执行分区的前提是先判断ReduceTask个数是否为1,不大于1肯定不执行

MapTask & ReduceTask源码跟踪

MapTask

context.write(k, NullWritable.get()); //自定义的 map 方法的写出,进入
    output.write(key, value); 
        //MapTask727 行,收集方法,进入两次
        collector.collect(key, value,partitioner.getPartition(key, value, partitions));
            HashPartitioner(); //默认分区器
        collect() //MapTask1082 行 map 端所有的 kv 全部写出后会走下面的 close 方法
            close() //MapTask732 行
                collector.flush() // 溢出刷写方法,MapTask735 行,提前打个断点,进入
                    sortAndSpill() //溢写排序,MapTask1505 行,进入
                        sorter.sort() QuickSort //溢写排序方法,MapTask1625 行,进入
                    mergeParts(); //合并文件,MapTask1527 行,进入
                collector.close(); //MapTask739 行,收集器关闭,即将进入 ReduceTask

ReduceTask

if (isMapOrReduce()) //reduceTask324 行,提前打断点
    initialize() // reduceTask333 行,进入
    init(shuffleContext); // reduceTask375 行,走到这需要先给下面的打断点
         totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle 第 80 行
            // MergeManagerImpl 第 232 235 行,提前打断点
            this.inMemoryMerger = createInMemoryMerger(); //内存合并
            this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
    rIter = shuffleConsumerPlugin.run();
        eventFetcher.start(); //开始抓取数据,Shuffle 第 107 行,提前打断点
        eventFetcher.shutDown(); //抓取结束,Shuffle 第 141 行,提前打断点
        copyPhase.complete(); //copy 阶段完成,Shuffle 第 151 行
        taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段,Shuffle 第 152 行
    sortPhase.complete(); //排序阶段完成,即将进入 reduce 阶段 reduceTask382 行
reduce(); //reduce 阶段调用的就是我们自定义的 reduce 方法,会被调用多次
    cleanup(context); //reduce 完成之前,会最后调用一次 Reducer 里面的 cleanup 方法

Join多种应用

Reduce Join

Map端的主要工作:为来自不同表或文件的KV对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就可以了

Reduce Join案例

  1. 需求
    输入数据


    image.png

order.txt

1001    01  1
1002    02  2
1003    03  3
1004    01  4
1005    02  5
1006    03  6

pd.txt

01  小米
02  华为
03  格力

期望输出


image.png
  1. 需求分析
    通过将关联条件作为Map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联


    image.png
  2. 编码
    TableBean

public class TableBean implements Writable {
    // 订单id
    private String id;
    // 商品id
    private String pid;
    // 商品数量
    private int amount;
    // 商品名称
    private String pname;
    // 标志字段 order pd
    private String flag;

    public TableBean() {

    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF();
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        flag = in.readUTF();
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }
}

Mapper

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String fileName;
    private Text outKey = new Text();
    private TableBean outValue = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 初始化order pd
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        fileName = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取一行
        String line = value.toString();
        System.out.println(line);


        // 判断属于哪个文件
        if (fileName.contains("order")) {
            // 切分, 防止多个文件切分逻辑不同
            String[] split = line.split("\t");
            // 订单
            /*
            1001    01  1
            1002    02  2
             */
            // 封装
            System.out.println(Arrays.toString(split));
            String id = split[0];
            String pid = split[1];
            int amount = Integer.parseInt(split[2]);

            outKey.set(pid);

            outValue.setId(id);
            outValue.setPid(pid);
            outValue.setAmount(amount);
            outValue.setPname("");
            outValue.setFlag("order");
        } else {
            // 切分
            String[] split = line.split("\t");
            // 产品
            /*
            01  小米
            02  华为
             */
            // 封装
            String pid = split[0];
            String pname = split[1];

            outKey.set(pid);
            outValue.setId("");
            outValue.setPid(pid);
            outValue.setAmount(-1);
            outValue.setPname(pname);
            outValue.setFlag("pd");
        }

        // 写出
        context.write(outKey, outValue);
    }
}

Reducer

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        /*
        01  1001    1   order
        01  1004    4   order
        01  小米  pd
         */
        // 初始化存储
        List<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        // 遍历
        for (TableBean value : values) {
            if ("order".equals(value.getFlag())) {
                // 订单
                TableBean tempBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tempBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                orderBeans.add(tempBean);
            } else {
                // 商品
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        // 遍历orderBeans,为每个设置产品名
        for (TableBean orderBean : orderBeans) {
            orderBean.setPname(pdBean.getPname());
            context.write(orderBean, NullWritable.get());
        }
    }
}

Driver

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置jar路径
        job.setJarByClass(TableDriver.class);
        // 关联mapper、reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
        // 设置map输出的KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
        // 设置最终输出的KV类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("data/join"));
        FileOutputFormat.setOutputPath(job, new Path("join_result"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
  1. 总结
    缺点:这种方式,合并的操作在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载很低,资源利用率不高,而且在Reduce阶段会产生数据倾斜
    解决方式:使用MapJoin

Map Join

  1. 使用场景
    Map Join适用于一张表很大,一张表很小的场景

  2. 优点
    思考:在Reduce端处理过多的表,非常容易产生数据倾斜,怎么办?
    在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜

  3. 具体办法:采用DistributedCache
    (1)在Mapper的setup阶段,将文件读取到缓存集合中
    (2)在Driver驱动类中加载缓存

// 缓存普通文件到Task运行的节点
job.addCacheFile(new URI("file://cache/xxx.txt"));
// 集群环境运行需要设置HDFS路径
job.addCacheFile(new URI("hdfs://ip:port/cache/xxx.txt"))

Map Join案例

  1. 需求同Reduce Join

  2. 需求分析


    image.png
  3. 编码
    Driver

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置jar路径
        job.setJarByClass(MapJoinDriver.class);
        // 关联mapper,不需要reducer
        job.setMapperClass(MapJoinMapper.class);
        job.setNumReduceTasks(0);
        // 设置缓存
        job.addCacheFile(new URI("cache/pd.txt"));
        // 设置map输出的KV类型
        job.setMapOutputKeyClass(TableBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 设置最终输出的KV类型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("data/join"));
        FileOutputFormat.setOutputPath(job, new Path("map_join_result"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Mapper

public class MapJoinMapper extends Mapper<LongWritable, Text, TableBean, NullWritable> {
    private Map<String, String> cache = new HashMap<>();
    private TableBean outKey = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 读取缓存
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());

        FSDataInputStream fsDataInputStream = fs.open(new Path(cacheFiles[0]));

        // 从流中读取数据
        List<String> lines = IOUtils.readLines(fsDataInputStream, StandardCharsets.UTF_8);
        for (String line : lines) {
            String[] split = line.split("\t");
            cache.put(split[0], split[1]);
        }

        // 关流
        org.apache.hadoop.io.IOUtils.closeStreams(fsDataInputStream, fs);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 处理order.txt
        // 获取一行
        String line = value.toString();

        // 切分
        String[] split = line.split("\t");

        /*
            1001    01  1
            1002    02  2
         */
        // 根据pid获取缓存中的pname
        // 封装
        outKey.setId(split[0]);
        outKey.setPname(cache.get(split[1]));
        outKey.setAmount(Integer.parseInt(split[2]));

        // 写出
        context.write(outKey, NullWritable.get());
    }
}

MapReduce开发总结

  1. 输入数据接口:InputFormat
    (1)默认使用的实现类是:TextInputFormat
    (2)TextInputFormat的功能逻辑是:一次读取一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
    (3)CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
  2. 逻辑处理接口:Mapper
    用户根据业务需求实现其中的三个方法:map()、setup()、cleanup()
  3. Partition分区
    (1)有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduce来返回一个分区号:key.hashCode() & Integer.MAX_VALUE % numReduces
    (2)如果业务上有特别的需求,可以自定义分区
  4. Comparable排序
    (1)当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo方法
    (2)部分排序:对最终输出的每一个文件进行内部排序
    (3)全排序:对所有数据进行排序,通常只有一个Reduce
    (4)二次排序:排序的条件有两个
  5. Combiner
    不影响最终的业务逻辑
    提前聚合map,解决数据倾斜
  6. Reducer
    用户的业务逻辑
    setup()、reduce()、clearup()
  7. OutputFormat
    (1)默认使用的实现类是:TextOutputFormat
    (2)TextOutputFormat的功能逻辑:将每个KV对项目表文件中写一行
    (3)自定义OutputFormat
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容