预览
Hadoop MapReduce是一个软件框架,用于编写并行处理海量数据的应用程序,应用程序运行在一个通用硬件组成的,可靠的,容错的大型集群之上。
MapReduce作业通常将输入数据集分割成独立的chunk,这些chunk以完全并行的方式由map任务处理。框架对map任务的输出进行排序,然后发送给reduce任务。通常作业的输入和输出都存储在文件系统中。框架负责调度任务,监控状态并在任务失败时重新执行任务。
通常计算节点和存储节点是相同的,即MapReduce框架和HDFS运行在相同的节点上。这种配置允许框架在数据已存在的节点上调度任务,从而在集群上获得非常高的聚合带宽。
MapReduce框架由一个主ResourceManager
,每个节点一个的从NodeManager
,和每个应用程序一个的MRAppMaster
组成。
最简单的例子,应用程序指定输入/输出地址,在其上应用实现了特定接口的map和reduce函数,之后是其他参数,这些统称作业配置。
Hadoop作业客户端将作业(jar/可执行文件)以及配置提交给ResourceManager
,它负责将程序/配置分发到从节点,调度任务并监控任务,将任务状态和诊断信息返回给客户端。
Hadoop框架是使用Java实现的,但是MapReduce应用程序可以不使用Java。
输入和输出
MapReduce框架只处理<key, value>
序对,即框架将作业的输入视为一组<key, value>
序对,并生成一组<key, value>
序对作为输出。
key
和value
类必须能够被框架序列化,因此必须实现Writable
接口。此外,key
类还必须实现WritableComparable
接口以实现排序。
MapReduce作业基本流程:
(input)<k1, v1>
-> map -> <k2, v2>
-> combine -> <k2, v2>
-> reduce -> <k3, v3>
(output)
示例:WordCount v1.0
WordCount
是一个简单的应用程序,它统计给定输入中每个单词出现的次数。
源码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
用法
假设环境变量已经设置好了:
export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
编译WordCount.java
并打包:
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
假设有如下目录:
-
/user/joe/wordcount/input
:HDFS中的输入目录 -
/user/joe/wordcount/output
:HDFS中的输出目录
作为输入的样本文件:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02
$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World
$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行应用程序:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
输出的内容为:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用程序可以使用-files
选项指定当前工作目录下的路径。-libjars
选项可以将jar包添加到应用程序的类路径中。-archives
选项允许传递压缩文件。
使用-files
,-libjars
和-archives
运行wordcount
示例:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
这里,myarchive.zip会被加压到一个名为“myarchive.zip”的目录中。
用户可以使用#号为文件指定不同的符号名:
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
代码说明
WordCount
应用程序很简单明了:
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Mapper
实现使用map
方法每次处理一行,数据来自TextInputFormat
指定的路径。然后使用StringTokenizer
将每行分割成单词,生成序对<<word>, 1>
。
在示例中,第一个map会生成:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个map生成:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
WordCount
也指定了combiner
:
job.setCombinerClass(IntSumReducer.class);
这样,在按key排序之后,每个map的输出传递给本地的combiner做本地聚合。
第一个聚合的输出:
< Bye, 1>
< Hello, 1>
< World, 2>
第二个聚合的输出:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
Reducer
实现使用reducer
方法对结果求和,最终作业的输出为:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
main
方法指定了作业的各种配置,例如输入/输出路径,key/value类型,输入/输出格式等。然后job.waitForCompletion
提交作业。
MapReduce - 用户接口
负载
应用程序通常实现Mapper
和Reducer
接口来提供map
和reducer
功能。这些是作业的核心。
Mapper
Mapper
将输入的key/value序对映射成一组中间key/value序对。
Map都是独立的任务,将输入记录转换成中间记录。中间记录和输入记录的类型不必相同。给定输入序对可能映射成零个或多个输出序对。
MapReduce框架为InputFormat
生成的每个InputSplit
都创建一个map任务。
总体来说,Mapper
实现通过Job.setMapperClass(Class)
方法传递给作业。之后框架为每个key/value序对调用map(WritableComparable, Writable, Context)
方法。应用程序可以覆盖cleanup(Context)
方法来执行必要的清理工作。
输出序对的类型可以不同于输入序对。给定输入序对可以映射成零个或多个输出序对。输出序对通过调用context.write(WritableComparable, Writable)
方法收集起来。
应用程序可以使用Counter
报告统计结果。
所有关联到给定输出key的中间结果随后由框架分组,然后传递给Reducer(s)
。用户可以通过Job.setGroupingComparatorClass(Class)
指定Comparator
控制分组。
Mapper
的输出先排序,然后按照Reducer
的数量分区。分区数就是作业的reduce任务数。用户可以实现自定义Partitioner
来控制分区情况。
用户可以使用Job.setCombinerClass(Class)
来指定一个可选的combiner
,它可以用来执行中间结果的本地聚合,有助于减少Mapper
到Reducer
之间的数据传输。
排好序的中间输出总是以(key-len, key, value-len, value)格式存储。应用程序可以控制是否压缩中间输出。
多少个map?
map任务的数量通常由输入的规模决定,即输入文件的block总量。
正常map任务的并行级别是每个节点10-100个map任务,任务设置需要一点时间,所以最好将map任务执行控制在一分钟之内。
这样,如果输入数据有10TB,blocksize为128MB,那么一共需要82000个map任务。除非使用Configuration.set(MRJobConfig.NUM_MAPS)
设置。
Reducer
Reducer
将中间结果归约成一个更小的集合。
Reducer
任务的数量可以通过Job.setNumReducerTask(int)
方法设置。
Reducer
实现通过Job.setReducerClass(Class)
传递给作业。之后应用程序调用reducer(WritableComparable, Iterable<Writable>, Context)
。应用程序也可以覆盖cleanup(Context)
方法。
Reducer
任务有三个阶段:shuffle,sort和reduce。
Shuffle
Reducer
任务的输入是Mapper
任务的排好序的输出,在这个阶段,框架将map任务输出的相关分区通过HTTP组织到一起。
Sort
框架按照key值为Reducer
的输入分组(不同的map任务可能输出相同的key值)。
shuffle和sort两个阶段同时执行。
Secondary Sort
如果有特殊的排序需求,可以使用Job.setSortComparatorClass(Class)
指定一个Comparator
来控制中间结果的key值如何分组。可以用来模拟二次排序。
Reduce
这个阶段会在每个分好组的输入(<key, (list of values)>
)上调用reduce(WritableComparable, Iterable<Writable>, Context)
方法。
reduce任务的输出通常通过context.write(WritableComparable, Writable)
写入文件系统。应用程序可以使用Counter
报告统计信息。
多少个Reduce?
正常的reduce任务数量应该是0.95
或1.75
乘以(<no. of nodes> * <no. of maximum containers per node>)
。
使用0.95
系数可以让所有reduce任务在map任务结束后立即开始执行。使用1.75
系数可以让速度快的节点执行完第一轮reduce任务后,为了负载平衡再执行第二轮reduce任务。
增加reduce任务的数量会增加框架的开销,但会增加负载平衡并降低故障成本。
缩放因子要略小于整数,以便在框架中为失败任务保留一些位置。
Reduce NONE
如果没有reduce阶段,可以将reduce任务设为0。
这种情况下,map任务的输出直接存储到FileSystem
,存储路径由FileOutputFormat.setOutputPath(Job, Path)
设置。
Partitioner
Partitioner按key值分区。
Partitioner控制map输出的key值分区情况。Key值通常根据哈希函数分区。分区数等于reduce任务数。HashPartitioner
是默认的partitioner。
Counter
Counter是应用程序用来报告统计结果的工具。
作业配置
Job
类表示MapReduce作业的配置。
Job
是用户描述MapReduce作业的主要接口。框架会按照Job
的描述执行作业,然而:
- 有些配置参数会被标记为
final
,从而无法更改。 - 有些配置参数可以直接设置,有些则稍显复杂。
Job
通常需要指定Mapper,Combiner(如有必要),Partitioner,Reducer,InputFormat,OutputFormat的具体实现。FileInputFormat
表示输入文件的集合。输出文件应当写入到(FileOutputFormat.setOutputPath(Path)
)。
Job
还可以设置一些可选的组件,比如Combiner,是否压缩中间结果等。
用户可以使用Configuration.set(String, String)
/Configuration.get(String)
设置/获取任意参数。不过大量只读数据推荐使用DistributedCache
。
任务执行和环境
MRAppMaster
会在独立的JVM中以子进程的形式执行Mapper/Reducer
任务。
子任务继承了MRAppMaster
的环境,用户可以使用mapreduce.(map|reduce).java.opts
指定额外的属性和配置参数。如果mapreduce.(map|reduce).java.opts
参数包含@taskid@
这样的符号,它会把任务的taskid
插入到配置中。
下面是一个多参数示例:
<property>
<name>mapreduce.map.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>
-Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>
内存管理
用户/管理员可以使用mapreduce.(map|reduce).memory.mb
指定可以使用的最大虚拟内存,这个值以MB为单位,按进程分配。这个值必须大于等于传递给JVM的-Xmx参数的值,否则JVM可能无法启动。
框架某些部分的内存也是可配置的。在map和reduce任务中,调整并行操作参数和磁盘写入频率可能会影响性能。监控作业的文件系统计数器对性能调优是很有帮助的。
Map参数
Map任务发出的记录首先会被序列化进buffer,它的元数据存储在accounting buffer中。只要序列化buffer或者元数据buffer达到阈值,buffer中的内容就会在排序后写入磁盘,这一切都在后台执行。如果执行过程中buffer被填满,map线程会被阻塞。map任务执行完后,所有记录会写入磁盘并合并成一个文件。尽量减少数据溢出次数可以减少map任务执行时间,较大的buffer也会减少map任务可用的内存。
Name | Type | Description |
---|---|---|
mapreduce.task.io.sort.mb |
int | 序列化buffer和accounting buffer大小的总和 |
mapreduce.map.sort.spill.percent |
float | 序列化buffer的使用限制,达到这个值后,线程会将数据写入磁盘 |
Shuffle/Reduce参数
如前所述,每个reduce任务获取分配给它们的分区数据,并周期性的将输出合并,然后存储到磁盘上。如果启用了中间数据压缩功能,还需要对数据解压缩。
Name | Type | Description |
---|---|---|
mapreduce.task.io.soft.factor |
int | 指定可以同时合并的分段数 |
mapreduce.reduce.merge.inmem.thresholds |
int | 在合并到磁盘之前,可以一次读取到磁盘的map输出数据的数量 |
mapreduce.reduce.shuffle.merge.percent |
float | 在内存合并之前读取map输出数据的内存阈值 |
mapreduce.reduce.shuffle.input.buffer.percent |
float | 相对于最大堆内存的百分比 |
mapreduce.reduce.input.buffer.percent |
float | 相对于最大堆内存的百分比 |
可配置参数
Name | Type | Description |
---|---|---|
mapreduce.job.id |
stirng | 作业ID |
mapreduce.job.jar |
string | jar包地址 |
mapreduce.job.local.dir |
string | 作业共享空间 |
mapreduce.task.id |
string | 任务ID |
mapreduce.task.attempt.id |
string | 任务尝试ID |
mapreduce.task.is.map |
boolean | 是否是map任务 |
mapreduce.task.partition |
int | 任务分区数 |
mapreduce.map.input.file |
string | map任务处理数据的文件名 |
mapreduce.map.input.start |
long | 输入数据的偏移量 |
mapreduce.map.input.length |
long | 输入数据的字节数 |
mapreduce.task.output.dir |
string | 任务的临时输出目录 |
任务日志
日志默认输出到${HADOOP_LOG_DIR}/userlogs
。
分布式类库
(略)
作业提交和监控
作业是用户与ResourceManager
交互的主接口。
Job
可以提交作业,追踪进程状态,访问任务日志,读取集群节点状态信息。
作业提交包括以下步骤:
- 检查作业的输入和输出
- 计算作业的
InputSplit
值 - 如有必要,设置必要的accounting信息
- 拷贝jar包和配置信息到系统目录
- 提交作业到
ResourceManager
作业历史文件输出目录可以使用mapreduce.jobhistory.intermediate-done-dir
和mapreduce.jobhistory.done-dir
指定。
用户可以使用$ mapred job -history output.jhist
命令查看历史日志简报。
作业控制
用户可能需要将作业链接起来以完成无法使用单个作业完成的任务。
作业输入
InputFormat
说明了输入数据的格式。
MapReduce框架使用InputFormat
来:
- 校验作业的输入数据
- 将输入文件分割成本地
InputSplit
实例,每个实例分配给一个独立的Mapper
- 使用
RecordReader
的具体实现从输入中读取记录
基于文件的InputFormat
的实现(比如FileInputFormat
的子类)的默认行为是将输入按照大小分割成逻辑上的InputSplit
实例。输入文件的块大小指定过了文件分割的上限,mapreduce.input.fileinputformat.split.minsize
参数可以指定文件分割的下限。
显然,基于输入大小的逻辑分割对于很多记录边界不甚明朗的应用来说是不够的。这是,应用应当实现一个RecordReader
。
TextInputFormat
是默认的InputFormat
。
InputSplit
InputSplit表示被单个Mapper
处理的数据单元。
通常InputSplit
表示面向字节的视图,而RecordReader
负责处理和呈现面向记录的视图。
FileSplit
是默认InputSplit。
RecordReader
RecordReader从InputSplit
中读取<key, value>
序对。
通常RecordReader
将面向字节的视图转换成面向记录的视图供map任务处理。
作业输出
OutputFormat
表示作业的输出格式。
MapReduce框架需要OutputFormat
:
- 校验作业的输出格式
- 提供
RecordWriter
实现写入输出文件
TextOutputFormat
是默认的OutputFormat
。
OutputCommitter
OutputCommitter
表示任务输出的提交过程。
MapReduce框架将OutputCommitter
用于:
- 在初始化阶段设置作业。例如创建临时输出目录。作业设置阶段是在作业状态为PREP时使用一个独立的任务完成的。一旦设置完成,作业编程RUNNING状态。
- 作业执行完毕后清理作业。例如删除临时输出目录。
- 设置任务临时输出。
- 检查任务是否需要提交。
- 提交任务输出。一旦任务执行完毕,如有必要任务会提交它的输出。
- 废弃任务提交。
FileOutputCommitter
是默认的OutputCommitter
。
任务副作用文件
某些应用的任务除了输出文件,还需要创建一种副文件。
这种情况下,如果有多个相同的Mapper
或Reducer
实例并行操作同一个文件就可能出问题。因此应用写入的时候必须能确定唯一的attempt任务(使用attemptid)。
使用FileOutputFormat
时,为了避免这个问题,框架会为attempt任务维护一个${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}
子目录结构。
RecordWriter
RecordWriter
将数据以<key, value>
格式写入输出文件。
其他特性
将作业提交到队列
队列是作业的集合,允许系统提供特定的功能。例如控制权限。
Hadoop自带有一个默认的“default”队列。
作业可以使用mapreduce.job.queuename
配置队列名字。
计数器
Counter
表示全局计数器。
DistributedCache
DistributedCache
可以有效的分发只读文件。
示例:WordCount v2.0
下面是一个更复杂的WordCount
示例。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
public class WordCount2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
String[] remainingArgs = parser.getRemainingArgs();
if (remainingArgs.length != 2 && remainingArgs.length != 4) {
System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
List<String> otherArgs = new ArrayList<>();
for (int i = 0; i < remainingArgs.length; ++i) {
if ("-skip".equals(remainingArgs[i])) {
job.addCacheFile(new Path(remainingArgs[++i]).toUri());
job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
} else {
otherArgs.add(remainingArgs[i]);
}
}
FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
enum CountersEnum {INPUT_WORDS}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive;
private Set<String> patternsToSkip = new HashSet<>();
private Configuration conf;
private BufferedReader fis;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
if (conf.getBoolean("wordcount.skip.patterns", false)) {
URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
for (URI patternsURI : patternsURIs) {
Path patternsPath = new Path(patternsURI.getPath());
String fileName = patternsPath.getName().toString();
parseSkipFile(fileName);
}
}
}
private void parseSkipFile(String fileName) {
try {
fis = new BufferedReader(new FileReader(fileName));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = caseSensitive ? value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreElements()) {
word.set(itr.nextToken());
context.write(word, one);
Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString());
counter.increment(1);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}