Hadoop InputFormat介绍

Hadoop InputFormat介绍

1 概述

我们在编写MapReduce程序的时候,在设置输入格式的时候,会调用如下代码:

job.setInputFormatClass(KeyVakueTextInputFormat.class)

通过上面的代码来保证输入的文件是按照我们想要的格式被读取,所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormatt,用于读取数据库文件的DBInputFromat,用于读取HBase的TableInputFormat等等。如下图是InputFormat的图谱。

InputFormat类图

2 InputFormat方法

从类图中可以看出,InputFormat抽象类仅有两个抽象方法:

public abstract List<InputSplit> getSplits(JobContext context)
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)

getSplits()方法是逻辑上拆分作业的输入文件集,然后将每个InputSplit分配给一个单独的Mapper进行处理

注意:拆分是按输入文件的逻辑分割,而输入文件不会被物理分割成块。每个切片都是一个<input-file-path,start,offset>的元组,InputFormat并创建相应的RecordReader读取这些切片。

createRecordReader()方法是为给定的切片创建一个记录阅读器。在切片被使用之前先调用RecordReader.initialize(InputSplit, TaskAttemptContext)方法。

通过InputFormat,MapReduce框架可以做到:

  1. 验证作业输入的正确性
  2. 将输入的文件切割成逻辑分片(InputSplit),一个InputSplit将会分配给一个独立的MapTask
  3. 提供RecordReader实现,读取InputSplit中的Kv对供Mapper使用。

不同的InputFormat会各自实现不同的文件读取方法以及分片方式,每个输入分片会被单独的MapTask作为数据源。下面将介绍InputSplit和RecordReader。

3 InputSplit介绍

MapTask的输入是一个输入切片,称为InputSplit。InputSplit也是一个抽象类,它在逻辑上包含给处理这个InputSplit的Mapper的所有KV对。不同类型的输入格式对应不同类型的切片,下图是InputSplit的类图。

InputSplit

3.1 InputSplit方法

// 获取切片大小,并且根据size对切片排序
public abstract long getLength()
// 获取存储该分片的数据所在的节点位置,其中的数据是本地的,位置信息不需要序列号
public abstract String[] getLocations()
// 获取有关切片在那个节点上的信息,以及它是如何存储在每个位置的
public SplitLocationInfo[] getLocationInfo()

4 RecordReader

RecorderReader将读入到Map的数据拆分成KV对。RecorderReader也是一个抽象类。下面是RecordReader的类图:

InputFormat类图

接下来看一下RecordReader的源代码:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

  /**
   * 由一个InputSplit初始化
   */
  public abstract void initialize(InputSplit split,
                                  TaskAttemptContext context
                                  ) throws IOException, InterruptedException;

  /**
   * 读取分片下一个KV
   */
  public abstract
  boolean nextKeyValue() throws IOException, InterruptedException;

  /**
   * Get the current key
   */
  public abstract
  KEYIN getCurrentKey() throws IOException, InterruptedException;

  /**
   * Get the current value.
   */
  public abstract
  VALUEIN getCurrentValue() throws IOException, InterruptedException;

  /**
   * 跟踪读取分片的进度
   */
  public abstract float getProgress() throws IOException, InterruptedException;

  /**
   * Close the record reader.
   */
  public abstract void close() throws IOException;
}

参考博文

http://www.cnblogs.com/shitouer/archive/2013/02/28/hadoop-source-code-analyse-mapreduce-inputformat.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 先思考问题 我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往...
    墙角儿的花阅读 12,106评论 0 9
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 3,707评论 0 1
  • 思考问题 Mapper类 Mapper类 四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOU...
    Sakura_P阅读 4,348评论 0 3
  • 说到跑步,我终于有谈论的资本了.从去年5月开始接触跑步(学校那些被迫式的不算),到现在已经一年多了。最开始的时候一...
    独白社阅读 1,456评论 0 3
  • 周二的早晨,走路去上班的途中,遇到拾荒的老人, 佝偻着在垃圾桶里翻找易拉罐、矿泉水瓶。同时垃圾桶里传出阵阵恶臭...
    晓言说阅读 1,839评论 0 0