最全的MapReduce框架原理,方便以后复习。知识点来自尚硅谷的课程学习。课程链接
一、InputFormat数据输入
1. 切片与MapTask并行度决定机制
数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。实际存储在磁盘上,还是按照HDFS将数据分成一个一个Block进行存储。
MapTask的并行速度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
- 当切片大小为100M时,不同节点之间需要数据传输,耗费大量IO,不高效;
- 当切分时,只针对单个文件进行切分,不考虑文件之间的大小;
- MapTask的数量由切片数量决定,切片的大小默认是块大小。
2. Job提交流程源码解析
waitForCompletion();
// 1 建立连接
connect();
// 1.1 创建提交Job的代理
new Cluster(getConfiguration());
// 1.2 判断是本地yarn还是远程yarn
initialize(jobTrackAddr, conf);
// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
// 2.1 创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2.2 获取jobid,并创建Job路径
JobID jobId = submitClient.getNewJobID();
// 2.3 拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUpLoader.uploadFiles(job, jobSubmitDir);
// 2.4 计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 2.5 向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 2.6 提交Job,返回提交状态
status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
3. FileInputFormat切片机制
- 根据计算公式,切片大小默认为块大小,本地模式切片大小为32M;
- 公式中,默认
minSize=1
,maxSize=Long.MAXValue
; - 切片大小设置:maxSize调的小于blockSize,则切片会变小。minSize调的比blockSize大,则可以让切片变大;
- MrAppMaster根据切片数量计算MapTask个数;
- 获取切片的文件名:
String name = inputSplit.getPath().getName();
- 根据文件类型获取切片信息:
FileSplit inputSplit = (FileSplit) context.getInputSplit();
4. CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按照文件进行切片,当有大量小文件时,就会产生大量的MapTask,处理效率及其低下。
CombineTextFormat用于小文件过多的场景,从逻辑上将多个小文件规划到一个切片中,交给一个MapTask处理。
- 虚拟存储切片最大值可以任意设置,但是要根据实际的小文件大小来具体设置:
combineTextFormat.setMaxInputSplitSize(job, 4194304); // 4M
- 主要分为两个过程:虚拟存储过程和切片过程。
- 当存储时当文件大于4M,会判断是否比2*4M大。
- 在实际使用时,需要处理很多小文件时,可以按如下操作,在Driver类中增加如下信息:
// 如果不设置InputFormat,默认是TextInputFormat.class
job.setInputFormatClass(CombineTextInputForamt.class);
// 设置虚拟存储切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
5. FileInputFormat实现类
在运行MapReduce程序时,针对不同格式的输入文件,MapReduce是如何读取这些数据的?
FileInputFormat常见的接口实现类包括:
- TextInputFormat
- KeyValueTextInputFormat
- NLineInputFormat
- CombineTextInputFormat
- 自定义的InputFormat
5.1 TextInputFormat
TextinputFormat是默认的FileInputFormat实现类。按行读取每条记录,健是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行,回车等),Text类型。
5.2 KeyValueTextInputFormat
每一行均为一条记录,被分割符分割为key和value,默认的分割符是"\t"。可以在驱动类中进行设置:
conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
此时的key是每行排在分割符之前的Text序列。
5.3 NLineInputFormat
如果使用NLineInpurFormat,代表每个MapTask进程处理的InputSplit不再按照Block块去切分,而是按照NLineInputFormat指定的行数N来切分。
,如果不整除,。
此时的key-value与TextInputFormat生成的一样。可以在驱动类中进行设置:
NLineInputFormat.setNumLinesPerSplit(job, 3);
5.4 自定义InputFormat
具体步骤:
1)自定义一个类继承FileInputFormat;
2)改写RecordReader,实现自定义读取并封装为key-value;
3)在输出时使用SequenceFileOutPutFormat输出合并文件。
SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式,文件路径+文件名为key,文件内容为value。
自定义InputFormat案例:
代码实现:
WholeFileInputFormat.java
package Inputformat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
}
WholeRecordReader.java
package Inputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
boolean isProgress = true;
// 初始化
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;
configuration = context.getConfiguration();
}
// 核心的业务逻辑
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProgress) {
byte[] buf = new byte[(int) split.getLength()];
// 1 获取Fs对象
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);
// 2 获取输入流
FSDataInputStream fis = fs.open(path);
// 3 拷贝
IOUtils.readFully(fis, buf, 0, buf.length);
// 4 封装v
v.set(buf, 0, buf.length);
// 5 封装k
k.set(path.toString());
// 6 关闭资源
IOUtils.closeStream(fis);
isProgress = false;
return true;
}
return false;
}
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
}
}
完成相应的Mapper和Reducer类,并在Driver中增加两句:
// 4 设置输入的InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 5 设置输出的OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
二、Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
1. Partition分区
分区就是将计算结果按照条件输出到不同文件中。比如按照手机号归属地将不同省份输出到不同文件中。
1.1系统默认的分区是Hash分区:
public class Hashpartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- 默认分区是根据key的HashCode对ReduceTasks个数取模得到的;
- 用户没法控制那个Key存储到那个分区。
1.2 自定义Partition分区
自定义步骤:
案例实现:
按照手机号前三位将统计结果写入到不同的文件。
自定义Partition类ProvincePartitioner.java
package Flowsum;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取手机号前三位
String prePhoneNum = key.toString().substring(0, 3);
int partition = 3;
if ("136".equals(prePhoneNum)) {
partition = 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}
return partition;
}
}
编写Mapper和Reducer,在Driver类中增加如下代码:
// 5 设置Partition分区
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(4);
- 在定义分区时,分区号一定要严格按照顺序从0开始;
- 在设置分区数时:
当忘记设置或者设置为1,则最终只会产生一个文件,程序会将所有的分区数据统统写入一个文件;
当设置的分区数在1和实际分区数之间会报错(IO异常);
当设置的分区数大于实际分区数,会生成期待的结果,但是会多出超出分区个数个空文件,因为实际运行中,没有对应的数据传给多余的ReduceTask。
1.3 WritableComparable排序
排序是MapReduce中的重要操作。
MapTask和ReduceTask均会对数据按照key进行排序 ,属于Hadoop的默认行为。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
MapReduce排序分类:
自定义排序:
bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
示例编写:
按照电话所属区将统计数据存储到不同的文件中,并在每个文件中实现按总流量的逆序。
重写编写FlowBean.java
package Sort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
sumFlow = upFlow + downFlow;
}
// 核心的比较
public int compareTo(FlowBean bean) {
int result;
if (sumFlow > bean.getSumFlow()) {
result = -1;
}else if(sumFlow < bean.getSumFlow()) {
result = 1;
}else{
result = 0;
}
return result;
}
// 序列化方法
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
// 反序列化方法
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
- 此时,要继承WritableComparable类;
- 重写compareTo方法,这是排序比较的核心;
编写Mapper类,FlowCountSortMapper.java
package Sort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 拆分
String[] fields = line.split("\t");
// 3 封装
k.setUpFlow(Long.parseLong(fields[1])); // 流量作为key
k.setDownFlow(Long.parseLong(fields[2]));
k.setSumFlow(Long.parseLong(fields[3]));
v.set(fields[0]); // 电话号码作为Value
// 3 写出
context.write(k, v);
}
}
- 需要对数据按照总流量排序,所以Map阶段结束后的key应该是Flowbean对象,value为电话号码.
编写Reducer类,FlowCountSortReducer.java
package Sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
- 排序结束后,生成的数据还是按照电话,流量展示,所以此时输出的key为电话号码,value为FlowBean。
编写Partition类,ProvincePartitioner.java
package Sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
String prePhoneNum = text.toString().substring(0, 3);
int partition = 3;
if ("136".equals(prePhoneNum)) {
partition= 0;
}else if ("137".equals(prePhoneNum)) {
partition = 1;
}else if ("138".equals(prePhoneNum)) {
partition = 2;
}
return partition;
}
}
- 在Reducer阶段,需要将不同所属区的数据写入不同文件,此时需要对数据进行Partition分区。
编写驱动类,FlowCountSortDriver.java
package Sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCountSortDriver.class);
job.setMapperClass(FlowCountSortMapper.class);
job.setReducerClass(FlowCountSortReducer.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(4);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}