在整个MapReducer阶段中,Map输入的文件,Reducer输出的文件都是存储在分布式文件系统中,但是Map任务处理的中间结果需要保存在本地磁盘,所以Map阶段需要考虑数据的局限性(即计算向数据靠拢)。
读源码
-
InputFormat
MapReducer框架使用InputFormat作为数据的预处理模块
public abstract class InputFormat<K, V> {
public InputFormat() {
}
// 对数据进行逻辑分片,得到一个InputSplit的列表
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
// 读取分片,转换成key/value形式
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
-
InputSpli(输入分片)
InputSplit是对文件进行预处理的输入单位,是逻辑切分,只是记录了要处理数据的位置和长度
根据输入文件计算分片大小,每个分片任务对应着一个Map
分片的大小范围可以在mapred-site.xml中设置,那在每次任务中分片大小又是多大呢?
在FileInputFormat中首先比较文件大小和最大分块大小(maxSize),得到一个最小值,然后和最小分块大小(minSize)进行比较得到一个最大值,就是分块大小。
public abstract class InputSplit {
public InputSplit() {
}
//获得当前spllit长度
public abstract long getLength() throws IOException, InterruptedException;
// 获取节点地址列表(每个split的储存地址不同)
public abstract String[] getLocations() throws IOException, InterruptedException;
// 如果是空值,则全部存储在磁盘上
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
-
RecordReader
RecordReader对InputSplit中的数据进行处理,加载数据并且转换成适合Map任务读取的键值形式
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
// 初始化
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
// 判断下一个key/value的存在
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
// 获得当前key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
// 获得当前value
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
// 获得当前处理进度(0.0~1.0)
public abstract float getProgress() throws IOException, InterruptedException;
// 关闭RecordReader
public abstract void close() throws IOException;
}
-
Mapper
根据用户定义的映射规则,输出一系列<key,value>作为中间结果,输入的key和value类需要支持序列化操作,即继承Writable,key的类同时也必须实现WritableComparable
默认输入key类型 LongWritable 记录数据分片的偏移位置
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Mapper() {
}
//预处理模块
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
//重写模块以满足业务需求
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
// 扫尾工作
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
//驱动
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
// 对输入的每一对key/value调用Map方法
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
// 设置Context
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
}
改写LongWrite为Text
想要改写首先我们要看Map默认的输入类型设置
job.setInputFormatClass(TextInputFormat.class);
将输入key改变成Text我们要设置
job.setInputFormatClass(KeyValueTextInputFormat.class);
我们先来看一下TextInputFormat的源码
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
public TextInputFormat() {
}
// 定义文本的读取方式,返回一个RecoredReader <LongWritable,Text>
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
// 返回一个LineRecordReader(继承自RecordReader)
return new LineRecordReader(recordDelimiterBytes);
}
// 判断是否分片,true进行分片
protected boolean isSplitable(JobContext context, Path file) {
// 判断是否加入压缩
CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
}
在TextInputFormat中重写了FileInputFormat类中的isSplitable()并进行了压缩判断
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
我们再看一下KeyValueTextInputFormat的源码
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
public KeyValueTextInputFormat() {
}
// 判断是否分片,并进行压缩判断
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
// 定义文本的读取方式,返回一个RecoredReader <Text,Text>
public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
context.setStatus(genericSplit.toString());
return new KeyValueLineRecordReader(context.getConfiguration());
}
}
总结:FileInputFormat是所以使用文件作为其数据源的InputFormat的子类,他有6个子类分别是
- TextInputFormat 每次读入一行数据 key:该行的偏移量 value: 行内容
- KeyValueTextInputFormat 按照分隔符分割为key和value,如果不存在key则是一行数据,value为空
分隔符设置,默认为“\t”conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//设置“,”为分隔符
- NLineInputFormat 可以实现Mapper收到固定行数
NLineInputFormat.setNumLinesPerSplit(job, 20); // 设置行数
- SequenceFileInputFormat 为Hadoop顺序文件设计,存储二进制<key,value>
- CombineFileInputFormat 为小文件设计,可以将多个文件打包到一个分片中
- FixedLengthInputFormat
多路径输入
我们可以通过设置setInputPaths()设置多个路径
FileInputFormat.setInputPaths(new Path(),new Path(),new Path());
也可以通过设置MultipleInputs来实现多个文件指定不同的Mapper
MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapA.class);
MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapB.class);
小文件处理
- 文件压缩
-
自定义分片
- 继承FileInputFormat类
- 重写里面的isSplitable 改成返回false 取消默认分片规则
- 重写createRecordReader 方法 指定新的规则
- 编写一个类继承RecordReader
- 重写里面的方法
- 设置你定义的InputFormat