MapReduce介绍
在Hadoop
中计算模型使用的是MapReduce
。Hadoop MapReduce
计算编程模型可以用于处理大数据集,它的一般思路是将输入dataset
切分为多个split
,每个split
交由一个mapper
处理,mapper
处理之后再交由reduce
处理,MapReduce
作业通常由成百上千的机器组成的集群并行处理。
MapReduce
是基于<k,v>
的编程。
理解MapReduce
模型并不难,实际上MapReduce
是一个很全面的抽象,本质思想就是仅靠map和list加上sort排序就可完成所有的离线分析工作 。比如Hive
中的计算就是基于MapReduce
的,它可以向SQL
一样完成对大数据进行分析,其工作原理是通过Hive
词法分析语法分析等引擎将SQL
转化为MapReduce
作业。这也说明了MapReduce
编程模型的通用型,几乎所有的sql等离线分析都可以转化为MapReduce
虽然MapReduce
简单通用但是MapReduce
编程范式也存在一些问题,由于MapReduce
只是简单的KV
处理,对应大数据集或者复杂的算法比如机器学习,需要多个阶段的MapReduce
,多次MapReduce
迭代和写磁盘操作带来了时间性能上的问题。Spark
内存计算模型,Tez
引擎等计算框架的在大部分分析领域处理速度都要优于MapReduce
。所以MapReduce
引擎也逐渐被这些引擎所替换。但是还是有必要学习MapReduce
引擎,因为它的很多思想在随后诞生的计算框架中都有借鉴和继承。并且由于MapReduce
相对比较简单,比较适合大数据入门学习
输入输出
k1,v1, k2,v2,k3,v3代表的是数据类型
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
WorldCount
那么应该如何编写Hadoop Mapreduce
程序呢?Hadoop
官网给出了一个通过Hadoop MapReduce
统计单词数据的程序。
编程流程一般是实现Mapper
和Reducer
接口,并实现对应的map
和 reduce
方法,然后在main
函数中设置相关参数,提交作业。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
public class WordCount2 {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
static enum CountersEnum { INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<String>();
private Configuration conf;
private BufferedReader fis;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", false)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String patternsFileName = patternsPath.getName().toString();
parseSkipFile(patternsFileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '"
+ StringUtils.stringifyException(ioe));
}
}
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String line = (caseSensitive) ?
value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(),
CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<String>();
for (int i=0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapper
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
/**
* The <code>Context</code> passed on to the {@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
Mapper
用于将输入健值对(<k1,v1>
)转换为中间健值对(<k2,v2>
)
map数的确定
Hadoop MapReduce
框架为InputFormat
产生的每个InputSplit
生成一个map
mapper的实现通过Job.setMapperClass(Class) 传递给Job。
在框架内部调用的是Mapper
的run
方法,流程如下:
- 调用
setup(context)
- 对
InputSplit
的每个健值对调用map(context.getCurrentKey(), context.getCurrentValue(), context)
- 调用
cleanup(context)
map
和reduce
是多对多的关系,也有可能没有reduce
每个map
产生的数据发往reduce
之前都需要分区,分区数取决于reduce
的个数map
端产生的数据按照分区发送给Reduce
,Reduce
端对接受到的数据merge
为一个大文件,然后分组(grouping)传递给Reducer。用户可以使用job.setGroupingComparatorClass(Class)
自定义分组Mapper
的输出是排序并且按Reducer
个数分区的,排序分区在map
的环形内存中完成。用户可以自己实现Partitioner
控制key
到Reducer
的规则
在map
端可以使用combiner
对产生的中间结果本地聚合,减少shuffle
到reduce
的数据量
Reducer
Reducer
有三个主要阶段:shuffle
、sort
和reduce
。
作业的reduce
数量由用户通过Job.setNumReduceTasks(int)设置。
job
通过 Job.setReducerClass(Class)设置reducer
的实现类。然后框架对每一个已分组的 <key, (list of values)>
调用 reduce(WritableComparable, Iterable<Writable>, Context),应用程序可以覆盖cleanup(Context)方法来执行任何所需的清除
Shuffle
Reducer
的输入在map
端已经排好序了。在这个阶段框架通过HTTP
获取mapper输出中对应的分区数据
Sort
框架在这个阶段按key对Reducer输入进行分组(因为不同的mapper可能输出相同的key)
Shuffle 和Sort是同时发生的,在取到mapper数据时就会进行merge操作
Secondary Sort
当reduce的分组判断规则和map传过来的中间结果相等分组判断规则不同时,可以通过Job.setSortComparatorClass(Class)指定一个Comparator
。 Job.setGroupingComparatorClass(Class)可以控制map中间结果的分组,配合Job.setSortComparatorClass(Class)可以实现基于值的二次排序。
Reduce
在这个阶段对每一个已分组的 <key, (list of values)>调用
reduce(WritableComparable, Iterable<Writable>, Context)
。reduce任务的输出一般通过Context.write(WritableComparable, Writable)
写入FileSystem
应用程序可以使用Counter
报告其统计信息。
Reducer
的输出没有排序。
排序过程
在Hadoop MapReduce
中发往reduce
的数据默认是按照key
排序的,可以说排序一直贯穿于整个过程中
-
map
阶段数据首先在内存中分区排序(排序使用quicksort
),然后对多个溢写的文件进行归并排序(mergesort
),在归并过程中应用combiner
本地聚合,产生map
端最终输出 -
reduce
阶段向map
端拉取自己分区的数据,同时对从多个map
端拉取的数据同样使用归并排序合成一个文件,然后分组调用reduce
一些基本术语
-
application
这个可以简单理解为编写完成相关MapReduce
程序,它是集群提交job
的最小单位,一个application
里面可以有多个job
,一般一个application
有一个job
-
job
用于描述MapReduce
的配置组成 ,包括输入输出规范,指定mapper
、reducer
实现类,分区,分组以及提交job -
task
分为maptask
和reductask
,具体的执行任务
Job
Job
是用户向Hadoop
框架描述MapReduce
作业以供执行的主要接口,Job
用于配置和提交MapReduce
作业,但是:
- 一些配置参数可能已经被管理员标记为
final
(参见Final Parameters),不能更改
Job可以用于设置非常多的参数,如下是一些主要的配置:
-
InputFormat
输入规范,包括分片计算InputSplit
和面向记录的阅读器RecordReader
Mapper
-
Partitioner
Mapper中间结果的分区方式 -
combiner
(如果有的话) Mapper分区结果本地聚合 Reducer
-
OutputFormat
输出规范
job input
InputFormat
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
RecordReader
InputFormat
描述了MapReduce
作业的输入规范,MapReduce
框架依赖于作业的InputFormat
完成
1 . 验证作业的输入规范
2 . 将输入文件分割成逻辑InputSplit
实例,然后将每个实例分配给单个Mapper
- 提供
RecordReader
实现,用于从逻辑InputSplit
中收集输入记录,以供mapper
程序处理
基于文件的默认InputFormat
实现为FileInputFormat,它根据输入文件的总大小(以字节为单位)将输入拆分为逻辑InputSplit
实例。一般输入文件的FileSystem
的blocksize
作为分片的上界。可以通过mapreduce.input.fileinputformat.split.minsize
设置分片下界。
显然,对于许多应用程序来说,基于输入大小的逻辑分割是不够的,因为必须考虑记录边界。在这种情况下,应用程序应该实现RecordReader
,它会考虑记录边界,并向单个任务提供面向记录的逻辑InputSplit
视图
job.setInputFormatClass()
可以用于设置InputFormat
,如果没有设置默认为TextInputFormat
如果TextInputFormat
是给定作业的InputFormat
,框架将检测带有.gz
扩展名的输入文件,并使用适当的CompressionCodec
自动解压它们。但是,必须注意的是,不能分割具有上述扩展名的压缩文件,每个压缩文件都由单个mapper
完整地处理。
InputSplit
InputSplit表示由单个mapper
处理的数据
InputSplit
通常呈现一个面向字节的输入视图,RecordReader
负责处理并呈现一个面向记录的视图。
FileSplit
是默认的InputSplit
public interface InputSplit extends Writable {
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
job output
OutputFormat
描述MapReduce
作业的输出规范。
MapReduce框架依赖于作业的OutputFormat完成:
- 验证作业的输出规范;例如,检查输出目录不存在。
- 提供用于写入作业输出文件的
RecordWriter
实现。输出文件存储在FileSystem
中
TextOutputFormat是默认的OutputFormat
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException;
}
OutputCommitter
OutputCommitter描述MapReduce作业的任务输出的提交。
MapReduce框架依赖于作业的OutputCommitter完成:
- 在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录
- 在作业完成后清理作业。例如,在作业完成后删除临时输出目录。
- 设置任务临时输出。任务设置是在任务初始化期间作为同一任务的一部分完成的
- 检查任务是否需要提交。这是为了在任务不需要提交时避免提交过程
- 任务输出的提交。任务完成后,如果需要,任务将提交它的输出
- 丢弃任务提交。如果任务失败/终止,输出将被清理。如果任务无法清除(在异常块中),将使用相同的
attempt-id
启动一个单独的任务来执行清除
FileOutputCommitter
是默认的OutputCommitter
RecordWriter
RecordWriter
将输出<key,value>
对写入输出文件。
RecordWriter
实现将作业输出写入文件系统。
MapReduce - User Interfaces
应用程序通常实现Mapper
和Reducer
接口来提供map
和reduce
方法。这些构成了工作的核心
Mapper
Mapper将输入键/值对映射为一组中间键/值对。
Mapper类图
map是将输入记录转换为中间记录的单个任务。转换后的中间记录不需要与输入记录具有相同的类型。给定的输入对可以映射到零或多个输出对。
Hadoop MapReduce
通过InputFormat
生成InputSplit
,每个生成的InputSplit
由一个map
任务处理
job通过Job.setMapperClass(Class)来设置mapper的实现类。然后框架为该任务的InputSplit中的每个键/值对调用 map(WritableComparable, Writable, Context)完成相关的处理,应用程序可以覆盖cleanup(Context)
方法来执行任何所需的清除
mapper
处理输入键值对得到的中间键值对可以和输入键值对的类型不同,一个输入键值对可以产生0到多个中间键值对,中间结果通过context.write(WritableComparable, Writable)
保存
应用程序可以使用Counter
报告其统计信息。
mapper
产生的中间结果最终会按照key
分组然后传递给reducer
,然后reducer
处理mapper
产生的中间结果,最后产生结果。用户可以通过Job.setGroupingComparatorClass(Class)指定一个Comparator
来控制分组。
Mapper
的中间结果会被排序然后按reducer个数分区。分区数等于reducer的个数。用户可以实现自定义Partitioner
控制<key,value>
的发送规则
用户可以选择通过 Job.setCombinerClass(Class)指定一个combiner
来执行中间输出的本地聚合(mapper端reduce),这有助于减少从Mapper
传输到Reducer
的数据量
经过排序的中间输出总是以简单的(key-len、key、value-len、value)格式存储。应用程序通过配置使用CompressionCodec可以控制中间输出是否被压缩以及如何被压缩
Mapper InputSplits Records之间的关系
InputFormat 负责计算分片(InputSplits )并将它们划分为记录(Records)。分片是物理概念和记录是逻辑概念
关于Mapper 中setup,map、cleanup、partitioner、combiner的关系
前面已经知道一个split
由一个mapper
处理,hadoop
框架通过反射job configuration
中的Mapper
类创建mapper
实例,调用mapper
的setup
方法,然后在mapper
的map
中使用RecordReader
读取split获取<k,v>
迭代处理,并调用partitioner
,之后调用cleanup
方法,最后调用combiner
方法做map
端本地归并,减少数据量以减少到reduce
的网络IO
简而言之,一个mapper
实例对应一个partitioner
,对应多个combiner
,combiner
个数为reducer
个数
作业提交流程
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// use job-specified working directory
setEncryptedSpillKeyIfRequired(taskFinal);
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // run the task
return null;
}
});
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
编程步骤
- 配置Configuration
- 继承Mapper和Reducer
- 创建job 配置job
- 提交job
input 和output
mapreduce
数据都是转化为<key,value>
键值对
在key
和value
类必须是可序列化的,在hadoop
中需要实现Writable
接口。此外,key
类必须实现 WritableComparable接口,以便可以进行相关的排序操作
MapReduce提交运行整体流程
- job.submit()提交
- JobSubmitter.submitJobInternal
- YarnRunner.submitJob
- ResourceMgrDelegate.submitApplication
- YarnClient.submitApplication
- ApplicationClientProtocolPBClientImpl.submitApplication
JobSubmitter.submitJobInternal主要的代码逻辑
//通过反射实例化OutputFormat,并调用其checkOutputSpecs方法,比如FileOutputFormat实现的checkOutputSpecs需要判断目标路径不存在否则抛出FileAlreadyExistsException
checkSpecs(job);
Configuration conf = job.getConfiguration();
//主要是将一些archive文件解压到nodemanager
addMRFrameworkToDistributedCache(conf);
JobID jobId = submitClient.getNewJobID();
//获取并设置唯一jobId
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//拷贝配置文件到提交目录
copyAndConfigureFiles(job, submitJobDir);
//计算分片,分片数决定map数
int maps = writeSplits(job, submitJobDir);
//写配置 job.xml
writeConf(conf, submitJobFile);