详解wordcount(TextInputFormat工作机制)

阅前注意事项

在看教程前,要先注意hadoop有新旧两版的api:

  1. 新版对应1.x版本,org.apache.hadoop.mapreduce.*。主要内容涉及新版本的API接口以及一些新特性(比如MapReduce安全)
  2. 旧版对应0.x版本,org.apache.hadoop.mapred.*。这里面主要包含旧的API接口以及MapReduce各个服务(JobTracker以及TaskTracker)的实现。一些hadoop1和2框架公用的部分也在此。

而百度上大部分教程都是用的hadoop0.x版本的api,容易误导新人,所以在看参考资料时要留意版本,学习合适的部分

问题引子

首先,在wordcount中,默认的InputFormatTextInputFormat,那么,TextInputFormat是如何把一个文本分为多行,再交给每个Mapper的呢?

如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如WordCount就是一个例子).

解释

首先,本文会提到的类有如下这些:
(以下的类都在org.apache.hadoop.mapreduce包内,别看到里去了)

(A->B代表A extends B)

  1. org.apache.hadoop.mapreduce包内
    1. FileSplit -> InputSplit
    2. TextInputFormat -> FileInputFormat -> InputFormat
    3. LineRecordReader -> RecordReader
    4. MapContextImpl -> MapContext,Mapper.Context -> MapContext
  2. org.apache.hadoop.mapred包内
    1. JvmTask
    2. MapTask

工作过程是如下几个步骤:

步骤0. 切割Split

在Job提交前,客户端就会调用FileInputFormatpublic List<InputSplit> getSplits(JobContext job)将一个文本按尽可能相等的字节长度,切割为一个个FileSplit
详情见下文博客
https://blog.csdn.net/ltliyue/article/details/51292312?utm_source=blogxgwz9

步骤1. 建立Mapper

  1. JvmTaskpublic void readFields(DataInput in)内的语句t = new MapTask();创建MapTask对象
  2. MapTaskpublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)内的语句中:
  if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
      } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
}

由于是新版本API,会调用runNewMapper
3.MapTaskvoid runNewMapper(...)中(请先阅读此方法的源代码,以便理解下文),就会创建各种Mapper要用到的参数,包括Mapper、InputFormat、InputSplit、RecordReader、MapContext,之后会运行:

  input.initialize(split, mapperContext); // input类型是RecordReader
  mapper.run(mapperContext);

InputFormat默认为TextInputFormat的情况下,input的实际类型是LineRecordReader,所以会调用相应的函数实现。
在这两句中,Mapper会初始化,并且准备运行。

步骤2. Mapper初始化

  1. 每个Mapper会被分配到一个Split,在runNewMapper中可以包装成RecordReader
    MapTask::runNewMapper中的语句:
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    而NewTrackingRecordReader是MapTask的内部类,其构造函数为:
    NewTrackingRecordReader(...)
            throws InterruptedException, IOException {
          this.reporter = reporter;
          this.inputRecordCounter = reporter
              .getCounter(TaskCounter.MAP_INPUT_RECORDS);
          this.fileInputByteCounter = reporter
              .getCounter(FileInputFormatCounter.BYTES_READ);
    
         ...
          this.real = inputFormat.createRecordReader(split, taskContext); // 重要
         ...
    }
    
    在注释了"重要"的那行调用了InputFormat(此处为TextInputFormat)的
    createRecordReader方法,将一个FileSplit包装为一个
    LineRecordReader
  2. 还是在MapTask::runNewMapper函数里
    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
        mapContext = 
          new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
              input, output, 
              committer, 
              reporter, split);
    
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);
    
    RecordReader会被包装进MapContextImpl实例,然后被作为拷贝模板传递给WrappedMapper.Context实例中(org.apache.hadoop.mapreduce.lib.map.WrappedMapper)。

之后,runNewMapper函数会调用mapper.run(mapperContext);开始运行。

步骤3. Mapper运行

Mapper::run(Context context)的代码如下:

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

需要注意:
该函数会循环调用 context.nextKeyValue()来获取key-value对。

  • context.nextKeyValue调用的是WrappedMapper::nextKeyValue,后者返回return mapContext.nextKeyValue();。所以实际调用的是MapContextImpl::nextKeyValue(),而它又返回return reader.nextKeyValue();所以最终调用的是RecordReader::nextKeyValue()nextKeyValue的函数实现详见WrappedMapperMapContextImplLineRecordReader

所以Mapper的运行过程中,循环调用的是LineRecordReadernextKeyValue函数。我们看到LineRecordReader::initializeLineRecordReader::nextKeyValue

public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    ...
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    ...
public boolean nextKeyValue() throws IOException {
    ...
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }
    }
    ...
  }

看到两个函数调用的in.readLine我们知道,它用了巧妙的办法对付两个Split分割一个句子的情况。

  1. 对于非第一个Split,它首先在initialize里读取第一行,再在nextKeyValue里一直读取,直到结束位置在Split的边界之后。
  2. 对于第一个Split,就只是在nextKeyValue里一直读取,直到结束位置在Split的边界之后。

总结来说,对于每个Split,都会在最后多读一行,相应的,开头就略去一行。而第一个Split不需要略去开头(顶多略去utf-8的标记)

所以,总的来说:

  • 从宏观上,一个文本会以字节为单位,被分为多个Split.
  • 从微观上,对于每个Split,都会通过略去开头一句话,多读结尾一句话的方法,避免句子被Split边界给切割开。

就像下图所示:


正所谓"上有政策,下有对策"

附记

通过大量调用抽象类的方法(而不是具体类)可以将软件架构解耦,提高各个模块的独立性,易于修改和替换。
但缺点是不易于学习者用"顺藤摸瓜"的方式学习整个系统。需要大量地查阅网络资料。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容