Hadoop中的数据读入 — InputFormat及其子类

1. 接口 InputFormat


  • getSplits: 对输入文件进行逻辑切分
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  • createRecordReader: 获取RecordReader(决定了文件中每条记录的读取方式)
    RecordReader<K, V> createRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;

2. FileInputFormat 和 CombineFileInputFormat

2.1 FileInputFormat


  • getSplits
    public List<InputSplit> getSplits(JobContext job) throws IOException {
      // getFormatMinSplitSize() = 1, 
      // getMinSplitSize(job): 配置 mapreduce.input.fileinputformat.split.minsize的设定值,没设置默认为1
      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
      // return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
      // mapreduce.input.fileinputformat.split.maxsize的设置值,没设置默认Long.MAX_VALUE
      long maxSize = getMaxSplitSize(job);
      // 生成逻辑切片
      List<InputSplit> splits = new ArrayList<InputSplit>();
      List<FileStatus> files = listStatus(job);
      for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
          BlockLocation[] blkLocations;
          if (file instanceof LocatedFileStatus) {
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
          } else {
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            // new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) }  ----- offset = 0
            blkLocations = fs.getFileBlockLocations(file, 0, length);
          if (isSplitable(job, path)) {
            // 配置的块大小,默认128M
            long blockSize = file.getBlockSize();
            // Math.max(minSize, Math.min(maxSize, blockSize));
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
            // 记录待逻辑切分的数据的长度
            long bytesRemaining = length;
            // SPLIT_SLOP = 1.1:即只有块的大小为 splitSize 的 1.1倍的时候才会进行切分
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              // length-bytesRemaining:待逻辑切分的数据的起始偏移量位置
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              // 将逻辑切片添加到splits列表中,这里添加进去的都是大小为splitSize的切片
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              bytesRemaining -= splitSize;
            if (bytesRemaining != 0) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              // 把剩下来不足 1.1*splitSize 的部分添加到 splits列表中
              splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
          } else { // 文件不可分割时则整个文件作为一个切片放入 splits 列表中
            splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));
        } else { 
          // 文件为空的时候放一个空切片
          splits.add(makeSplit(path, 0, length, new String[0]));
      // 记录输入文件的个数
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
      return splits;
    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
      for (int i = 0 ; i < blkLocations.length; i++) {
        // 判断offset是否在这个数据块中
        if ((blkLocations[i].getOffset() <= offset) &&(offset < blkLocations[i].getOffset() +   blkLocations[i].getLength())){
          return i;
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
      return new FileSplit(file, start, length, hosts, inMemoryHosts);
    (1) 文件是分开来单独切分的,不是把目录下的所有文件都放到一起切分。即如果有两个文件,一个 160M,一个150M,默认设置下会被切成4个逻辑切片。
    (2) 文件大小超过我们设定的切片大小的 1.1 倍的情况下才会被切分。
    (3) 文件是否能切分,要看对 isSplitable 这个方法的实现。FileInputFormat 中这个方法永远返回 true
2.2 CombineFileInputFormat(后续再分析)


public List<InputSplit> getSplits(JobContext job)  {
  long minSizeNode = 0;
  long minSizeRack = 0;
  long maxSize = 0;
  Configuration conf = job.getConfiguration();
  if (minSplitSizeNode != 0) {
    minSizeNode = minSplitSizeNode;
  } else {
    minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
  if (minSplitSizeRack != 0) {
    minSizeRack = minSplitSizeRack;
  } else {
    minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
  if (maxSplitSize != 0) {
    maxSize = maxSplitSize;
  } else {
    maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
  List<FileStatus> stats = listStatus(job);
  List<InputSplit> splits = new ArrayList<InputSplit>();
  if (stats.size() == 0) {
    return splits;    

  for (MultiPathFilter onepool : pools) {
    ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
    for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
      FileStatus p = iter.next();
      if (onepool.accept(p.getPath())) {
        myPaths.add(p); // add it to my output set
    // create splits for all files in this pool.
    getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);

  // create splits for all files that are not in any pool.
  getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);

  // free up rackToNodes map
  return splits;    

3. FileInputFormat的主要子类

3.0 MapTask 的启动
  • 首先看 MapTask 类中的run方法,转到 runNewMapper 方法中
    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) {
      if (useNewApi) {
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
      } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
      done(umbilical, reporter);
  • runNewMapper: 主要是获取 Mapper, InputFormat, InputSplit, RecordReader, RecordWriter 等对象,然后调用 Mapper中的run方法进行MapTask的运行
    void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical, TaskReporter reporter) {
      // 设置上下文
      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);
      // 通过反射获取mapper对象
      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
          ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
      // 通过反射设置IputFormat对象
      org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
          ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
      // rebuild the input split
      org.apache.hadoop.mapreduce.InputSplit split = null;
      split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());
      // 获取RecordReader对象(数据读入)
      org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
        new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);
      job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
      org.apache.hadoop.mapreduce.RecordWriter output = null;
      // 获取RecordWriter(数据输出)
      if (job.getNumReduceTasks() == 0) {
        output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
      mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
          mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
      try {
        // 初始化input
        input.initialize(split, mapperContext);
        input = null;
        output = null;
      } finally {
        closeQuietly(output, mapperContext);
    接下来主要看一下 MapTask 中 RecordReader 的获取 以及 input.initialize(split, mapperContext) 中做了什么
  • NewTrackingRecordReader: 通过 inputFormat 中的 createRecordReader 方法获取 RecordReader,默认 inputFormat 中返回 LineRecordReader 对象
    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
        org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
        TaskReporter reporter,
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext){
      this.reporter = reporter;
      this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
      this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
      List <Statistics> matchedStats = null;
      if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
        matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split).getPath(), taskContext.getConfiguration());
      fsStats = matchedStats;
      long bytesInPrev = getInputBytes(fsStats);
      // 关键是这一句,这里 inputFormat 默认情况下是 TextFileInputFormat,其中的 createRecordReader 返回的是 LineRecordReader 对象
      this.real = inputFormat.createRecordReader(split, taskContext);
      long bytesInCurr = getInputBytes(fsStats);
      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
  • input.initialize(split, mapperContext):input是RecordReader 对象,这里默认情况下是 LineRecordReader。初始化中主要是获取了输入流并对切片读取的末尾位置进行了调整
    public void initialize(InputSplit genericSplit,TaskAttemptContext context) {
      FileSplit split = (FileSplit) genericSplit;
      Configuration job = context.getConfiguration();
      this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
      start = split.getStart();
      end = start + split.getLength();
      final Path file = split.getPath();
      // open the file and seek to the start of the split
      final FileSystem fs = file.getFileSystem(job);
      fileIn = fs.open(file);
      CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
      if (null!=codec) {  // 使用压缩的处理方式
        isCompressedInput = true; 
        decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
          final SplitCompressionInputStream cIn =
            ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,
          in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);
          start = cIn.getAdjustedStart();
          end = cIn.getAdjustedEnd();
          filePosition = cIn;
        } else {
          in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);
          filePosition = fileIn;
      } else {
        in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength());
        filePosition = fileIn;
      // 对于非第一个切片,读一行放空,算出长度,然后更新起始位置为第二行。每一个切片处理完的时候再多处理一行,这样就可以还原原文件
      if (start != 0) {
        start += in.readLine(new Text(), 0, maxBytesToConsume(start));
      this.pos = start;
    对RecordReader进行初始化后就开始执行 Mapper 中的run方法
  • Mapper中的run方法
    public void run(Context context) {
      // 在task执行前执行一次
      try {
        while (context.nextKeyValue()) {
          map(context.getCurrentKey(), context.getCurrentValue(), context);
      } finally {
        // 在task完成之后执行一次
    其中 context 为 MapContextImpl对象,其中方法 nextKeyValue / getCurrentKey / getCurrentValue 都是 RecordReader 中的方法
  • LineRecordReader 中 nextKeyValue / getCurrentKey / getCurrentValue 方法
    public boolean nextKeyValue() {
      if (key == null) {
        key = new LongWritable();
      if (value == null) {
        value = new Text();
      int newSize = 0;
      // 这里会往后多读一行
      while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
        if (pos == 0) {
          newSize = skipUtfByteOrderMark();
        } else {
          newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
          pos += newSize;
        if ((newSize == 0) || (newSize < maxLineLength)) {
      if (newSize == 0) {
        key = null;
        value = null;
        return false;
      } else {
        return true;
    public LongWritable getCurrentKey() {
      return key;
    public Text getCurrentValue() {
      return value;
    private long getFilePosition() throws IOException {
      long retVal;
      if (isCompressedInput && null != filePosition) {
        retVal = filePosition.getPos();
      } else {
        retVal = pos;
      return retVal;
3.1 TextFileInputFormat

从 Hadoop 源码中 job 的 submit 方法一路往下可以看到 InputFormat 对象是通过 JobSubmitter 类中 writeNewSplits 方法中代码 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 获取,即通过反射获取。
最终转向 JobContextImpl 中的 getInputFormatClass 方法 return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); INPUT_FORMAT_CLASS_ATTR对应参数名 mapreduce.job.inputformat.class,没有进行设置的情况下默认用的是 TextInputFormat 这个子类。
TextInputFormat 中对 createRecordReader 和 isSplitable 方法进行了重写。

  • createRecordReader
    这个方法主要是设定了一下分隔符,然后返回一个 LineRecordReader 对象。

    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
                         TaskAttemptContext context) {
      String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
      byte[] recordDelimiterBytes = null;
      if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
      return new LineRecordReader(recordDelimiterBytes);
  • isSplitable

    protected boolean isSplitable(JobContext context, Path file) {
      final CompressionCodec codec =  new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
      if (null == codec) {
        return true;
      return codec instanceof SplittableCompressionCodec;
3.2 CombineTextInputFormat


