阅前注意事项
在看教程前,要先注意hadoop有新旧两版的api:
- 新版对应1.x版本,org.apache.hadoop.mapreduce.*。主要内容涉及新版本的API接口以及一些新特性(比如MapReduce安全)
- 旧版对应0.x版本,org.apache.hadoop.mapred.*。这里面主要包含旧的API接口以及MapReduce各个服务(JobTracker以及TaskTracker)的实现。一些hadoop1和2框架公用的部分也在此。
而百度上大部分教程都是用的hadoop0.x版本的api,容易误导新人,所以在看参考资料时要留意版本,学习合适的部分
问题引子
首先,在wordcount
中,默认的InputFormat
是TextInputFormat
,那么,TextInputFormat
是如何把一个文本分为多行,再交给每个Mapper
的呢?
如果一个行被切分到两个split里(这几乎是一定会发生的情况),
TextInputFormat
是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如WordCount就是一个例子).
解释
首先,本文会提到的类有如下这些:
(以下的类都在org.apache.hadoop.mapreduce
包内,别看到里去了)
(A->B代表A extends B)
-
org.apache.hadoop.mapreduce
包内- FileSplit -> InputSplit
- TextInputFormat -> FileInputFormat -> InputFormat
- LineRecordReader -> RecordReader
- MapContextImpl -> MapContext,Mapper.Context -> MapContext
-
org.apache.hadoop.mapred
包内- JvmTask
- MapTask
工作过程是如下几个步骤:
步骤0. 切割Split
在Job提交前,客户端就会调用FileInputFormat
的public List<InputSplit> getSplits(JobContext job)
将一个文本按尽可能相等的字节长度,切割为一个个FileSplit
。
详情见下文博客
https://blog.csdn.net/ltliyue/article/details/51292312?utm_source=blogxgwz9
步骤1. 建立Mapper
-
JvmTask
的public void readFields(DataInput in)
内的语句t = new MapTask();
创建MapTask
对象 -
MapTask
的public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
内的语句中:
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
由于是新版本API,会调用runNewMapper
3.MapTask
的void runNewMapper(...)
中(请先阅读此方法的源代码,以便理解下文),就会创建各种Mapper
要用到的参数,包括Mapper、InputFormat、InputSplit、RecordReader、MapContext
,之后会运行:
input.initialize(split, mapperContext); // input类型是RecordReader
mapper.run(mapperContext);
在InputFormat
默认为TextInputFormat
的情况下,input
的实际类型是LineRecordReader
,所以会调用相应的函数实现。
在这两句中,Mapper
会初始化,并且准备运行。
步骤2. Mapper初始化
- 每个
Mapper
会被分配到一个Split
,在runNewMapper
中可以包装成RecordReader
。
看MapTask::runNewMapper
中的语句:
而NewTrackingRecordReader是MapTask的内部类,其构造函数为:org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
在注释了"重要"的那行调用了NewTrackingRecordReader(...) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); ... this.real = inputFormat.createRecordReader(split, taskContext); // 重要 ... }
InputFormat
(此处为TextInputFormat
)的
createRecordReader
方法,将一个FileSplit
包装为一个
LineRecordReader
。 - 还是在
MapTask::runNewMapper
函数里org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext);
RecordReader
会被包装进MapContextImpl
实例,然后被作为拷贝模板传递给WrappedMapper.Context
实例中(org.apache.hadoop.mapreduce.lib.map.WrappedMapper
)。
之后,runNewMapper
函数会调用mapper.run(mapperContext);
开始运行。
步骤3. Mapper运行
Mapper::run(Context context)
的代码如下:
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
需要注意:
该函数会循环调用 context.nextKeyValue()
来获取key-value对。
-
context.nextKeyValue
调用的是WrappedMapper::nextKeyValue
,后者返回return mapContext.nextKeyValue();
。所以实际调用的是MapContextImpl::nextKeyValue()
,而它又返回return reader.nextKeyValue();
。所以最终调用的是RecordReader::nextKeyValue()
。nextKeyValue
的函数实现详见WrappedMapper
,MapContextImpl
和LineRecordReader
。
所以Mapper
的运行过程中,循环调用的是LineRecordReader
的nextKeyValue
函数。我们看到LineRecordReader::initialize
和LineRecordReader::nextKeyValue
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
...
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
...
public boolean nextKeyValue() throws IOException {
...
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
}
...
}
看到两个函数调用的in.readLine
我们知道,它用了巧妙的办法对付两个Split分割一个句子的情况。
- 对于非第一个Split,它首先在
initialize
里读取第一行,再在nextKeyValue
里一直读取,直到结束位置在Split的边界之后。 - 对于第一个Split,就只是在
nextKeyValue
里一直读取,直到结束位置在Split的边界之后。
总结来说,对于每个Split,都会在最后多读一行,相应的,开头就略去一行。而第一个Split不需要略去开头(顶多略去utf-8
的标记)
所以,总的来说:
- 从宏观上,一个文本会以字节为单位,被分为多个Split.
- 从微观上,对于每个Split,都会通过略去开头一句话,多读结尾一句话的方法,避免句子被Split边界给切割开。
就像下图所示:
正所谓"上有政策,下有对策"啊
附记
通过大量调用抽象类的方法(而不是具体类)可以将软件架构解耦,提高各个模块的独立性,易于修改和替换。
但缺点是不易于学习者用"顺藤摸瓜"的方式学习整个系统。需要大量地查阅网络资料。
参考
- 理解yarn
一张图读懂yarn - map-reduce 默认配置
https://blog.csdn.net/knidly/article/details/80268230 - FileOutputFormat::getSplits调用详情
https://blog.csdn.net/ltliyue/article/details/51292312?utm_source=blogxgwz9 - 默认的inputformat
https://www.cnblogs.com/zhangyinhua/p/7740888.html - 理解InputFormat
- https://blog.csdn.net/u010521842/article/details/77800858
- hadoop自定义输入格式(InputFormat
- 详解FileInputFormat
- Hadoop 之 InputFormat
- Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)
- https://blog.csdn.net/posa88/article/details/7897963
- https://blog.csdn.net/u010521842/article/details/77800858
- 自定义format
- TextInputFormat如何处理跨split的行
此教程基于hadoop0.x的api
https://blog.csdn.net/bluishglc/article/details/9380087 - Mapper.Context与RecordReader的联系
https://stackoverflow.com/questions/38205848/where-to-find-details-for-the-api-of-context-in-hadoopMapContextImpl 和 ContextFactory 涉及到了内部类对象的深拷贝。