简单的 MapReduce 作业,需要一个 map 函数,一个 reduce 函数和一些用来运行作业的代码
// Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
横向扩展(Scaling out)
需要把数据存储在分布式文件系统中,通过使用 Hadoop 资源管理系统 YARN,Hadoop 可以将 MapReduce 计算转移到存储有部分数据的各机器上
相关概念
MapReduce 作业
MapReduce 作业 == 输入数据 + MapReduce程序 + 配置信息
任务分类
Hadoop 将作业分成若干个任务(task)来执行,其中包括两类任务:map 任务和 reduce 任务,这些任务运行在集群几点上,并通过 YARN 进行调度。如果一个任务失败,它将在另一个不同的节点上自动重新调度运行
分片(input split)
Hadoop 将 MapReduce 的输入数据划分成等长的小数据块,成为输入分片(input split)或简称“分片”
Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录
分片切分的粒度
相对来说,分片被切分的越细,作业的负载平衡质量会更高。但是如果分片切分的太细,那么管理分片的总时间和构建 map 任务的总时间将决定作业的整个执行时间
对于大多数作业来说,一个合理的分片大小趋向于 HDFS 的一个块的大小(128MB)
数据本地化优化(data locality optimization)
Hadoop 在存储输入数据的节点上运行 map 任务,可以获得最佳性能,而无需使用宝贵的集群带宽资源
跨机架的 map 任务
有时对于一个 map 任务的输入分片来说,存储该分片的 HDFS 数据块副本的所有节点可能正在运行其他的 map 任务,此时作业调度需要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽(slot)来运行该 map 任务,这将导致机架与机架之间的网络传输
为何最佳分片的大小应该与块大小相同?
如果分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到 map 任务运行的节点。这与使用本地数据运行整个 map 任务相比,显然效率更低
reduce 任务并不具备数据本地化的优势,单个 reduce 任务的输入通常来自于所有的 mapper 的输出;多个 reduce 任务,每个 map 任务针对输出进行分区
reduce 的输出通常存储在 HDFS 中以实现可靠存储。第一个副本存储在本地节点上,其他的副本处于可靠性考虑存储在其他机架的节点上
reduce 任务的数量并非由输入数据的大小决定,反而是独立指定的
combiner 函数
combiner 函数能够帮助减少 mapper 和 reducer 之间的数据传输量
// 通过如下方式调用来启用 combiner 函数
job.setComiberClass(XXXReducer.class)
Hadoop Streaming
Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以可以使用任何编程语言通过标准输入/输出来写 MapReduce 程序
Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是一行一行地传输,最后将结果行写到标准输出。map 输出的键-值对以一个制表符分隔的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键排过序,最后将结果写入标准输出