Hadoop中的数据读入 — InputFormat及其子类

1. 接口 InputFormat

FileInputFormat实现了这个接口,接口中有两个方法。

  • getSplits: 对输入文件进行逻辑切分
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    
  • createRecordReader: 获取RecordReader(决定了文件中每条记录的读取方式)
    RecordReader<K, V> createRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException;
    

2. FileInputFormat 和 CombineFileInputFormat

2.1 FileInputFormat

FileInputFormat是所有基于文件进行数据输入的基类,是一个抽象类,其中主要是对getSplits方法进行了实现,并未对createRecordReader方法进行实现。

  • getSplits
    public List<InputSplit> getSplits(JobContext job) throws IOException {
      .........
      // getFormatMinSplitSize() = 1, 
      // getMinSplitSize(job): 配置 mapreduce.input.fileinputformat.split.minsize的设定值,没设置默认为1
      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
      // return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
      // mapreduce.input.fileinputformat.split.maxsize的设置值,没设置默认Long.MAX_VALUE
      long maxSize = getMaxSplitSize(job);
    
      // 生成逻辑切片
      List<InputSplit> splits = new ArrayList<InputSplit>();
      List<FileStatus> files = listStatus(job);
      for (FileStatus file: files) {
        Path path = file.getPath();
        long length = file.getLen();
        if (length != 0) {
          BlockLocation[] blkLocations;
          if (file instanceof LocatedFileStatus) {
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
          } else {
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            // new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) }  ----- offset = 0
            blkLocations = fs.getFileBlockLocations(file, 0, length);
          }
          if (isSplitable(job, path)) {
            // 配置的块大小,默认128M
            long blockSize = file.getBlockSize();
            // Math.max(minSize, Math.min(maxSize, blockSize));
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
            // 记录待逻辑切分的数据的长度
            long bytesRemaining = length;
            // SPLIT_SLOP = 1.1:即只有块的大小为 splitSize 的 1.1倍的时候才会进行切分
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              // length-bytesRemaining:待逻辑切分的数据的起始偏移量位置
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              // 将逻辑切片添加到splits列表中,这里添加进去的都是大小为splitSize的切片
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                          blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));
              bytesRemaining -= splitSize;
            }
            if (bytesRemaining != 0) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              // 把剩下来不足 1.1*splitSize 的部分添加到 splits列表中
              splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                         blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));
            }
          } else { // 文件不可分割时则整个文件作为一个切片放入 splits 列表中
            splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));
          }
        } else { 
          // 文件为空的时候放一个空切片
          splits.add(makeSplit(path, 0, length, new String[0]));
        }
      }
      // 记录输入文件的个数
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
      .........
      return splits;
     }
    
    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
      for (int i = 0 ; i < blkLocations.length; i++) {
        // 判断offset是否在这个数据块中
        if ((blkLocations[i].getOffset() <= offset) &&(offset < blkLocations[i].getOffset() +   blkLocations[i].getLength())){
          return i;
        }
      }
      .........
    }
    
    protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
      return new FileSplit(file, start, length, hosts, inMemoryHosts);
    }
    
    通过上述代码的分析可以得出以下几点:
    (1) 文件是分开来单独切分的,不是把目录下的所有文件都放到一起切分。即如果有两个文件,一个 160M,一个150M,默认设置下会被切成4个逻辑切片。
    (2) 文件大小超过我们设定的切片大小的 1.1 倍的情况下才会被切分。
    (3) 文件是否能切分,要看对 isSplitable 这个方法的实现。FileInputFormat 中这个方法永远返回 true
2.2 CombineFileInputFormat(后续再分析)

CombineFileInputFormat是一个抽象类,其中主要是对getSplits方法进行了实现,并未对createRecordReader方法进行实现。

public List<InputSplit> getSplits(JobContext job)  {
  long minSizeNode = 0;
  long minSizeRack = 0;
  long maxSize = 0;
  Configuration conf = job.getConfiguration();
  if (minSplitSizeNode != 0) {
    minSizeNode = minSplitSizeNode;
  } else {
    minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
  }
  if (minSplitSizeRack != 0) {
    minSizeRack = minSplitSizeRack;
  } else {
    minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
  }
  if (maxSplitSize != 0) {
    maxSize = maxSplitSize;
  } else {
    maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
  }
  ............
  List<FileStatus> stats = listStatus(job);
  List<InputSplit> splits = new ArrayList<InputSplit>();
  if (stats.size() == 0) {
    return splits;    
  }

  for (MultiPathFilter onepool : pools) {
    ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
    for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
      FileStatus p = iter.next();
      if (onepool.accept(p.getPath())) {
        myPaths.add(p); // add it to my output set
        iter.remove();
      }
    }
    // create splits for all files in this pool.
    getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
  }

  // create splits for all files that are not in any pool.
  getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);

  // free up rackToNodes map
  rackToNodes.clear();
  return splits;    
}

3. FileInputFormat的主要子类

3.0 MapTask 的启动
  • 首先看 MapTask 类中的run方法,转到 runNewMapper 方法中
    public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) {
      ......
      if (useNewApi) {
        runNewMapper(job, splitMetaInfo, umbilical, reporter);
      } else {
        runOldMapper(job, splitMetaInfo, umbilical, reporter);
      }
      done(umbilical, reporter);
    }
    
  • runNewMapper: 主要是获取 Mapper, InputFormat, InputSplit, RecordReader, RecordWriter 等对象,然后调用 Mapper中的run方法进行MapTask的运行
    void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical, TaskReporter reporter) {
      // 设置上下文
      org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
        new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);
      // 通过反射获取mapper对象
      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
        (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
          ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
      // 通过反射设置IputFormat对象
      org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
        (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
          ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
      // rebuild the input split
      org.apache.hadoop.mapreduce.InputSplit split = null;
      split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());
      // 获取RecordReader对象(数据读入)
      org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
        new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);
      
      job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
      org.apache.hadoop.mapreduce.RecordWriter output = null;
      
      // 获取RecordWriter(数据输出)
      if (job.getNumReduceTasks() == 0) {
        output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
      } else {
        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
      }
    
      org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
      mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
    
      org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
          mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
    
      try {
        // 初始化input
        input.initialize(split, mapperContext);
        mapper.run(mapperContext);
        mapPhase.complete();
        setPhase(TaskStatus.Phase.SORT);
        statusUpdate(umbilical);
        input.close();
        input = null;
        output.close(mapperContext);
        output = null;
      } finally {
        closeQuietly(input);
        closeQuietly(output, mapperContext);
      }
    }
    
    接下来主要看一下 MapTask 中 RecordReader 的获取 以及 input.initialize(split, mapperContext) 中做了什么
  • NewTrackingRecordReader: 通过 inputFormat 中的 createRecordReader 方法获取 RecordReader,默认 inputFormat 中返回 LineRecordReader 对象
    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
        org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
        TaskReporter reporter,
        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext){
      this.reporter = reporter;
      this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
      this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
      List <Statistics> matchedStats = null;
      if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
        matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split).getPath(), taskContext.getConfiguration());
      }
      fsStats = matchedStats;
      long bytesInPrev = getInputBytes(fsStats);
      // 关键是这一句,这里 inputFormat 默认情况下是 TextFileInputFormat,其中的 createRecordReader 返回的是 LineRecordReader 对象
      this.real = inputFormat.createRecordReader(split, taskContext);
      long bytesInCurr = getInputBytes(fsStats);
      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
    }
    
  • input.initialize(split, mapperContext):input是RecordReader 对象,这里默认情况下是 LineRecordReader。初始化中主要是获取了输入流并对切片读取的末尾位置进行了调整
    public void initialize(InputSplit genericSplit,TaskAttemptContext context) {
      FileSplit split = (FileSplit) genericSplit;
      Configuration job = context.getConfiguration();
      this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
      start = split.getStart();
      end = start + split.getLength();
      final Path file = split.getPath();
    
      // open the file and seek to the start of the split
      final FileSystem fs = file.getFileSystem(job);
      fileIn = fs.open(file);
      CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
      if (null!=codec) {  // 使用压缩的处理方式
        isCompressedInput = true; 
        decompressor = CodecPool.getDecompressor(codec);
        if (codec instanceof SplittableCompressionCodec) {
          final SplitCompressionInputStream cIn =
            ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,
              SplittableCompressionCodec.READ_MODE.BYBLOCK);
          in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);
          start = cIn.getAdjustedStart();
          end = cIn.getAdjustedEnd();
          filePosition = cIn;
        } else {
          in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);
          filePosition = fileIn;
        }
      } else {
        fileIn.seek(start);
        in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength());
        filePosition = fileIn;
      }
      // 对于非第一个切片,读一行放空,算出长度,然后更新起始位置为第二行。每一个切片处理完的时候再多处理一行,这样就可以还原原文件
      if (start != 0) {
        start += in.readLine(new Text(), 0, maxBytesToConsume(start));
      }
      this.pos = start;
    }
    
    对RecordReader进行初始化后就开始执行 Mapper 中的run方法
  • Mapper中的run方法
    public void run(Context context) {
      // 在task执行前执行一次
      setup(context);
      try {
        while (context.nextKeyValue()) {
          map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
      } finally {
        // 在task完成之后执行一次
        cleanup(context);
      }
    }
    
    其中 context 为 MapContextImpl对象,其中方法 nextKeyValue / getCurrentKey / getCurrentValue 都是 RecordReader 中的方法
  • LineRecordReader 中 nextKeyValue / getCurrentKey / getCurrentValue 方法
    public boolean nextKeyValue() {
      if (key == null) {
        key = new LongWritable();
      }
      key.set(pos);
      if (value == null) {
        value = new Text();
      }
      int newSize = 0;
      // 这里会往后多读一行
      while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
        if (pos == 0) {
          newSize = skipUtfByteOrderMark();
        } else {
          newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
          pos += newSize;
        }
        if ((newSize == 0) || (newSize < maxLineLength)) {
          break;
        }
      }
      if (newSize == 0) {
        key = null;
        value = null;
        return false;
      } else {
        return true;
      }
    }
    
    @Override
    public LongWritable getCurrentKey() {
      return key;
    }
    
    @Override
    public Text getCurrentValue() {
      return value;
    }
    
    private long getFilePosition() throws IOException {
      long retVal;
      if (isCompressedInput && null != filePosition) {
        retVal = filePosition.getPos();
      } else {
        retVal = pos;
      }
      return retVal;
    }
    
3.1 TextFileInputFormat

从 Hadoop 源码中 job 的 submit 方法一路往下可以看到 InputFormat 对象是通过 JobSubmitter 类中 writeNewSplits 方法中代码 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); 获取,即通过反射获取。
最终转向 JobContextImpl 中的 getInputFormatClass 方法 return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); INPUT_FORMAT_CLASS_ATTR对应参数名 mapreduce.job.inputformat.class,没有进行设置的情况下默认用的是 TextInputFormat 这个子类。
TextInputFormat 中对 createRecordReader 和 isSplitable 方法进行了重写。

  • createRecordReader
    这个方法主要是设定了一下分隔符,然后返回一个 LineRecordReader 对象。

    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
                         TaskAttemptContext context) {
      String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
      byte[] recordDelimiterBytes = null;
      if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
      return new LineRecordReader(recordDelimiterBytes);
    }
    
  • isSplitable
    文件无压缩都可切片;文件有压缩格式的时候,如果压缩格式文件不可切片则不能切片,非否则可切片

    protected boolean isSplitable(JobContext context, Path file) {
      final CompressionCodec codec =  new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
      if (null == codec) {
        return true;
      }
      return codec instanceof SplittableCompressionCodec;
    }  
    
3.2 CombineTextInputFormat

后面再分析

3.3
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354