1. 接口 InputFormat
FileInputFormat实现了这个接口,接口中有两个方法。
-
getSplits: 对输入文件进行逻辑切分
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
-
createRecordReader: 获取RecordReader(决定了文件中每条记录的读取方式)
RecordReader<K, V> createRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
2. FileInputFormat 和 CombineFileInputFormat
2.1 FileInputFormat
FileInputFormat是所有基于文件进行数据输入的基类,是一个抽象类,其中主要是对getSplits方法进行了实现,并未对createRecordReader方法进行实现。
- getSplits
通过上述代码的分析可以得出以下几点:public List<InputSplit> getSplits(JobContext job) throws IOException { ......... // getFormatMinSplitSize() = 1, // getMinSplitSize(job): 配置 mapreduce.input.fileinputformat.split.minsize的设定值,没设置默认为1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); // mapreduce.input.fileinputformat.split.maxsize的设置值,没设置默认Long.MAX_VALUE long maxSize = getMaxSplitSize(job); // 生成逻辑切片 List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); // new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) } ----- offset = 0 blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { // 配置的块大小,默认128M long blockSize = file.getBlockSize(); // Math.max(minSize, Math.min(maxSize, blockSize)); long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 记录待逻辑切分的数据的长度 long bytesRemaining = length; // SPLIT_SLOP = 1.1:即只有块的大小为 splitSize 的 1.1倍的时候才会进行切分 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { // length-bytesRemaining:待逻辑切分的数据的起始偏移量位置 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); // 将逻辑切片添加到splits列表中,这里添加进去的都是大小为splitSize的切片 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); // 把剩下来不足 1.1*splitSize 的部分添加到 splits列表中 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts())); } } else { // 文件不可分割时则整个文件作为一个切片放入 splits 列表中 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts())); } } else { // 文件为空的时候放一个空切片 splits.add(makeSplit(path, 0, length, new String[0])); } } // 记录输入文件的个数 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); ......... return splits; } protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // 判断offset是否在这个数据块中 if ((blkLocations[i].getOffset() <= offset) &&(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } ......... } protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { return new FileSplit(file, start, length, hosts, inMemoryHosts); }
(1) 文件是分开来单独切分的,不是把目录下的所有文件都放到一起切分。即如果有两个文件,一个 160M,一个150M,默认设置下会被切成4个逻辑切片。
(2) 文件大小超过我们设定的切片大小的 1.1 倍的情况下才会被切分。
(3) 文件是否能切分,要看对 isSplitable 这个方法的实现。FileInputFormat 中这个方法永远返回 true
2.2 CombineFileInputFormat(后续再分析)
CombineFileInputFormat是一个抽象类,其中主要是对getSplits方法进行了实现,并未对createRecordReader方法进行实现。
public List<InputSplit> getSplits(JobContext job) {
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
Configuration conf = job.getConfiguration();
if (minSplitSizeNode != 0) {
minSizeNode = minSplitSizeNode;
} else {
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
}
if (minSplitSizeRack != 0) {
minSizeRack = minSplitSizeRack;
} else {
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
}
if (maxSplitSize != 0) {
maxSize = maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
}
............
List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
if (stats.size() == 0) {
return splits;
}
for (MultiPathFilter onepool : pools) {
ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
FileStatus p = iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
// create splits for all files in this pool.
getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
rackToNodes.clear();
return splits;
}
3. FileInputFormat的主要子类
3.0 MapTask 的启动
- 首先看 MapTask 类中的run方法,转到 runNewMapper 方法中
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) { ...... if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
- runNewMapper: 主要是获取 Mapper, InputFormat, InputSplit, RecordReader, RecordWriter 等对象,然后调用 Mapper中的run方法进行MapTask的运行
接下来主要看一下 MapTask 中 RecordReader 的获取 以及 input.initialize(split, mapperContext) 中做了什么void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical, TaskReporter reporter) { // 设置上下文 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter); // 通过反射获取mapper对象 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 通过反射设置IputFormat对象 org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset()); // 获取RecordReader对象(数据读入) org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // 获取RecordWriter(数据输出) if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext); try { // 初始化input input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }
- NewTrackingRecordReader: 通过 inputFormat 中的 createRecordReader 方法获取 RecordReader,默认 inputFormat 中返回 LineRecordReader 对象
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat, TaskReporter reporter, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext){ this.reporter = reporter; this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ); List <Statistics> matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split).getPath(), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesInPrev = getInputBytes(fsStats); // 关键是这一句,这里 inputFormat 默认情况下是 TextFileInputFormat,其中的 createRecordReader 返回的是 LineRecordReader 对象 this.real = inputFormat.createRecordReader(split, taskContext); long bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
- input.initialize(split, mapperContext):input是RecordReader 对象,这里默认情况下是 LineRecordReader。初始化中主要是获取了输入流并对切片读取的末尾位置进行了调整
对RecordReader进行初始化后就开始执行 Mapper 中的run方法public void initialize(InputSplit genericSplit,TaskAttemptContext context) { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { // 使用压缩的处理方式 isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // 对于非第一个切片,读一行放空,算出长度,然后更新起始位置为第二行。每一个切片处理完的时候再多处理一行,这样就可以还原原文件 if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; }
- Mapper中的run方法
其中 context 为 MapContextImpl对象,其中方法 nextKeyValue / getCurrentKey / getCurrentValue 都是 RecordReader 中的方法public void run(Context context) { // 在task执行前执行一次 setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { // 在task完成之后执行一次 cleanup(context); } }
- LineRecordReader 中 nextKeyValue / getCurrentKey / getCurrentValue 方法
public boolean nextKeyValue() { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // 这里会往后多读一行 while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; }
3.1 TextFileInputFormat
从 Hadoop 源码中 job 的 submit 方法一路往下可以看到 InputFormat 对象是通过 JobSubmitter 类中 writeNewSplits 方法中代码
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
获取,即通过反射获取。
最终转向 JobContextImpl 中的 getInputFormatClass 方法return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
INPUT_FORMAT_CLASS_ATTR
对应参数名mapreduce.job.inputformat.class
,没有进行设置的情况下默认用的是 TextInputFormat 这个子类。
TextInputFormat 中对 createRecordReader 和 isSplitable 方法进行了重写。
-
createRecordReader
这个方法主要是设定了一下分隔符,然后返回一个 LineRecordReader 对象。public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); }
-
isSplitable
文件无压缩都可切片;文件有压缩格式的时候,如果压缩格式文件不可切片则不能切片,非否则可切片protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }
3.2 CombineTextInputFormat
后面再分析