一、背景
MapReduce 是谷歌大数据 “三驾马车” 的第二篇论文,它是一个分布式计算编程模型,主要是为了解决 “Page Rank” 问题,即网页排名的问题。我们搜索网页时,输入关键字搜索得到一个网页的列表,对每个搜索结果网页需要计算出一个权重,再根据权重决定哪个网页排在前、哪个网页排在后(当然百度那臭名昭著的竞价排名相信大家也有所耳闻,商家花钱提高权重,所以搜资料还是得谷歌)
那么这个权重如何计算出来的?通常网页里还会有超链接指向别的网页,如果一个网页热度比较高那么从总体上就会有更多的从其他网页指向它的超链接,假设现在有4个网页,它们之间的超链接指向的关系如下:
网页两两之间用0表示不存在链接关系,用1表示存在链接关系,那么可以为这4个网页建立一个4x4的向量矩阵。
我们把每一列的数字加起来,就得到该网页的一个分数,网页1为0,网页2为1,网页3为3,网页4为2,将得到的分数作为权重进行 rank,就得到了 page 的的一个先后关系优先级:网页3 > 网页4 > 网页2 > 网页1。
而谷歌的爬虫每天会从全世界网络上抓取海量的网页,假设有1亿个网页,存储在分布式文件系统中,而网页之间又存在超链跳转的关系,这时建立的 Google 向量矩阵将会是 1亿 x 1亿 的庞大矩阵。单台计算机(哪怕是性能超强的机器)也无法完整大矩阵的计算。基于这样的问题背景,Google 提出了 MapReduce 的计算模型来计算这样的大矩阵。
二、MapReduce 编程模型
MapReduce 的核心思想是 “先拆分,再合并”。针对 “Page Rank” 的大矩阵计算的问题,可以将大矩阵拆分成很多个小矩阵,将小矩阵的计算分配给不同的计算机来完成,这个过程称为 “Map”;每个小矩阵的结果计算出来后,再汇总合并,最终得到大矩阵的计算结果,这个过程称为 “Reduce”。通过这样的方式,无论要计算的向量矩阵有多大,都可以进行计算。
比如我们将上述的矩阵,拆分成4个小矩阵分配给4台不同的机器执行。
Map 阶段的计算结果(格式:【节点名:网页-分数】)如下:
node1:网页1-0,网页2-1
node2:网页3-2,网页4-2
node3:网页1-0,网页2-0
-
node4:网页3-1,网页4-0
Reduce 阶段的计算结果如下:
网页1:node1-0 + node3-0 = 0
网页2:node1-1 + node3-0 = 1
网页3:node2-2 + node3-1 = 3
-
网页4:node2-2 + node4-0 = 2
最终网页排名为:网页3、网页4、网页2、网页1。
1、概念
MapReduce 是 Hadoop 的数据处理组件。MapReduce 程序把输入数据转换成特定格式的输出数据。一个 MapReduce 程序主要就做下面这两步:
- Map
- Reduce
在 Map 和 Reduce 中间还有一个处理阶段,叫做 Shuffle 和 排序操作。
什么是 MapReduce 作业?
一个 MapReduce Job 过程分成两个阶段:Map 阶段和 Reduce 阶段。每个阶段都用 key/value 作为输入和输出;每个阶段都需要定义函数,也就是 map 函数和 reduce 函数;可以简单认为 map 函数是对原始数据提出出有用的部分,而 reduce 函数则是对提取出来的数据进行处理。
什么是 MapReduce Task?
MapReduce 里面的 task 可以分两种,即 Map task 和 Reduce task,即处理分片数据的 Mapper 和 Reducer 任务,这里的 Mapper 和 Reducer 的业务逻辑由开发者定义。
什么是 Task Attempt?
Task Attempt,即任务尝试。集群的机器在任何时间都可能发生故障,比如,正在处理数据的机器挂了,MapReduce 把任务重新调度到其他机器节点。当然这里的重新调度次数并非不受限制的,它是有上限的,默认是 4 次,如果一个任务(Mapper 任务或者 Reducer 任务)失败 4 次,那么整个 Job 就被认为失败了。对于高优先级的作业或者大型作业,这个值可以调高一点。
(1)Map
map 函数以键值对作为输入数据,不管数据是结构化还是非结构化,框架都会把数据转换成键值对形式。键是输入值的引用,而值就是要操作的数据集。
用户可以根据业务需求开发特定的业务逻辑来实现 MapReduce 框架的 map 函数。map 函数会对每个输入键值对的值部分处理。处理之后会生成输出结果,map 的输出叫做中间输出,它的类型可能与输入键值不同。map的输出结果是存储在本地磁盘的。
(2)Reduce
Reduce 以 Map 的输出结果作为输入数据,并对这些数据进行处理。通常,在 reducer 我们会做聚合或求和计算。另外,MapReduce 给 reduce 的输入数据按键做排序操作。
用户可以根据业务需求开发特定的业务逻辑来实现 MapReduce 框架的 reduce 函数,reduce 函数对输入值做聚合操作,并输出最终结果写入到 HDFS。
2、与 Yarn 的关系
MapReduce 是一个构建在 Yarn 资源管理系统之上的一个分布式计算框架,通过 Yarn 实现资源的调度,Yarn 的主节点是 ResourceManager,从节点是 NodeManager,主节点通常跟 HDFS 的 NameNode 运行在同一台机器上,而 HDFS 的 DataNode 所在的机器上通常也会有一个 NodeManager 的进程。
当客户端提交执行一个 MapReduce 程序的请求时,Yarn 会为其生成一个 job,并分配一个对应的 job id,然后在 NodeManager 上创建 Container,应用程序就在上面运行,一个 job 会分为多个的 map task 和 reduce task,每个task就在 container 中运行,通常 map 的输出要作为 reduce 的输入,这就需要不同的容器之间的通讯和数据传输,而这是通过 MapReduce 来实现的,Yarn 不关心应用容器之间是如何通讯和交互,而只专注于资源的调度分配。
3、demo
为了更加透彻理解 MapReduce 的编程模型,通过一个求和的例子来进行说明。
现在假设有一个求和的作业,输入数据是:1+2+3+4+5+6+7+8+9+10+...+n
假设 n 非常大,所以按照 “先拆分,后合并” 的思想,需要对它先进行拆分,为了方便说明,假设n就是10。将其拆分成3个小任务,分别执行求和,得到10、18、27,map结束;将所有map的结果汇总合并,得到55,reduce结束,将结果保存到 HDFS 中,整个 job 执行完成。
总结:
1、任务 Job = Map(s) + [Reduce(s)] // (s) map task、reduce task 都可能有多个,[] 有些作业可能没有reduce阶段
2、Map 的输出是 Reduce 的输入
3、所有的输入输出都是<key value>,一共有4对
<k1 v1> map的输入 <k2 v2> map的输出 <k3 v3> reduce的输入 <k4 v4> reduce的输出
4、k2=k3,v3是一个集合,该集合中的每个元素是v2
5、输入输出都是 HDFS(本地模式除外)
6、MapReduce 运行在 Yarn 的容器之上(本地模式除外)
三、数据流
MapReduce 作业(job)是执行的工作单元。Hadoop 将作业分成若干个任务执行,包括两类任务:map 任务和 reduce 任务。这些任务运行在集群的节点上,通过 Yarn 进行调度,如果其中一个任务失败,它将在另一个节点上自动重新调度执行。
1、Map 数据流
Hadoop 将 MapReduce 的输入数据划分成等长的小数据块,称为输入分片(input split),简称 “分片”。Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。map 任务还会对输出的数据进行排序。
数据分片的大小会如何影响程序运行效率?最佳分片大小应该是多大?为什么?
① 将数据划分为分片,可以通过多台机器并行处理每个分片获得更快的处理效率,不同的机器性能和负载可能不同,如果分片较细,较快或负载较低的机器可以处理更多的数据分片,获得更好的负载均衡。
② 跟数据块的大小保持一致(128M)。如果分片大小,管理分片的开销会提高;如果分片太大,超过一个数据块的大小,导致可能跨越两个数据块,那么有可能不同数据块存储在不同 HDFS 节点上,分片中的部分数据需要网络传输到 map 任务运行的节点,与使用本地数据运行整个 map 任务相比,效率更低。
为什么 map 任务将输出写入本地磁盘而非 HDFS?
因为 map 输出是中间结果,经过 reduce 任务处理后才产生最终输出结果,一旦作业完成,map 的输出结果就可以删除。
Map 任务具有 “数据本地化优势”,即 Hadoop 会在尽量把计算移动到离数据最近的地方执行,当 map 任务运行在存储输入数据的节点上运行时,可以获得最佳性能,因为它无需使用宝贵的集群带宽资源。
关于数据本地化的概念
移动计算:即将运行计算的地方移动到数据所在的地方,这样可以避免数据在网络上的传输。
移动数据:将数据移动到运行计算的地方,当数据量较大时,会产生比较多的网络IO开销。
代码在离它运算的数据最近的地方执行更加高效,特别在数据量大的情况下代码执行效率提升更加明显。因为移动代码消耗的网络带宽,要远远比移动大量数据消耗的带宽资源小很多。因此,HDFS 给 MapReduce 提供了一个接口,用于把代码移动到离数据最近的地方。
HDFS 的数据本地化策略如下图所示:
- 如果存储数据的节点处于空闲状态,则 map 任务在数据节点上运行,此时可获得最佳性能。(如 a 所示)
- 如果存储数据的所有节点忙于运行其他 map 任务,则作业会调度从某个数据块副本所在机架上的一个相对空闲节点运行 map 任务,这时数据只会在机架内节点间传输。(如 b 所示)
- 如果存储数据的所有节点所在机架上的所有节点都很忙碌,则会使用其他机架中的节点来运行该 map 任务,这将导致机架之间的网络传输。(如 c 所示)
2、Reduce 数据流
数据本地化的特性,执行 map 任务的节点跟数据存储的节点有了关联,但是对 reduce 任务来说,可以运行在集群中任意一个节点上。reduce 任务的输入来自所有 map 任务的输出,因此 reduce 任务要等到所有的 map 任务完成后才开始处理数据,map 任务将排序后的输出通过网络传输到运行 reduce 任务的节点,因此 reduce 任务不具备数据本地化的优势。
reduce 任务会将来自多个 map 任务拷贝过来的数据进行合并,再正式进行数据的处理,并在处理完后将输出存在在 HDFS 中,由于 HDFS 对数据块存储有副本数的要求,所以 reduce 会在 HDFS 中写入多个数据块,第一个副本存储在本地,其他副本存储在其他机架上的节点。
reduce 任务的完整数据流
存在多个 reduce 任务的数据流
如果有多个 reduce 任务,每个 map 任务就会针对输出进行分区(partition),即为每个 reduce 任务建一个分区。默认情况下 partitioner 为 map 输出数据的 key 通过哈希函数计算出一个值,再对分区数求模得到数据的分区,也可以自定义分区函数来改变分区的行为。
map 任务和 reduce 任务之间的数据流称为 shuffle(混洗)。 因为每个 reduce 任务的输入都来自多个 map 任务。
无 reduce 任务的数据流
当数据处理可以完全并行时,可能会出现无 reduce 任务的情况。在这种情况下,唯一的非本地节点数据传输是 map 任务将结果写入 HDFS。
四、第一个小程序 wordcount
1、演示
Hadoop 自带了很多 example 程序,下面演示一个对单词进行计数的小程序,首先将 data.txt 上传到 HDFS 上,内容如下:
I love Beijing
I love China
Beijing is the capital of China
上传数据
hdfs dfs -put data.txt /input
在 $HADOOP_HOME/share/hadoop/mapreduce
目录下,有一个 hadoop-mapreduce-examples-2.7.3.jar 文件,自带了很多小程序,其中有一个就是 wordcount。
指定程序,会提示用法
[root@bigdata111 mapreduce]# hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount
Usage: wordcount <in> [<in>...] <out>
指定输入数据和输出目录,程序执行:
hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/wc
可以看到程序开始执行,系统生成一个作业,并用 ID 标识,执行的过程分为 map 和 reduce 两阶段,map 执行到 100% 后,reduce 才开始执行。
程序生成的结果会输出到 HDFS 的 /output/wc
目录中,生成了两个文件,_SUCCESS 文件没有任何内容,只是用来标识该作业执行成功,part-r-00000 存储作业执行的结果,cat 该文件的内容,可以看到输入数据中都有哪些单词,每个单词出现了多少次,并且还对结果根据单词进行了排序。
2、分析
下面来分析 wordcount 小程序在 MapReduce 中经过了怎样的数据处理流程。
输入数据是一个文本文件,map 在读取处理数据时,是以每一行为单位的,对 map 来说每行数据对应一个输入的 k1-v1 对,其中 k1 是该行数据的偏移量,v1 是该行文本数据。Hadoop 要求 MapReduce 中所有输入输出都应该是一个序列化的数据类型,对 map 来说长整型 k1 对应 LongWritable 类型,字符串 v1 对应 Text 类型。
在 map 阶段,拿到数据后将 v1 转化为 String 类型并根据空格进行分词,输出的 k2-V2 对,k2 对应分词中的每个单词(类型为 Text),v2 对应单词出现的次数(类型为 IntWritable)。
在 reduce 阶段,reduce 的输入 k3-v3 来自 map 的输出 k2-v2,其中 k2 = k3,而 v3 是由多个相同的 k2 对应的所有 v2 组合的集合。对应到 wc 小程序中,k3 就是分词中出现的每一个单词,v3 则是该单词所有出现次数的集合,因此 reduce 程序处理中要将所有的 v3 累加起来,进而得到每个单词在文件中出现的总的次数。
实际上借助 map 到 reduce 数据的 shuffle,如果 reduce 忽略 v3,则可以轻松实现 SQL 中 distinct 去重的功能;而 reduce 拿到的数据,实际上是针对每个单词的分组,因此实现 group by 分组也很方便,并且很容易实现 max、min、avg 等分组运算;最后,reduce 的数据可能来源于多个 map,映射到数据库中,可以将每个 map 的输出看成是一张数据表的数据源,不同 map 对应不同数据表,就为实现多表查询之类的操作奠定了基础。
3、开发
以新版本的 API 为准,新版本的 API 报名中带 mapreduce,而旧版本包名带缩写 mapred,这里使用的是 hadoop2.7.3。
(1)mapper
定义一个 mapper,需要继承 Mapper
类,并重写 map()
方法,代码如下:
package wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
/**
* context代表map的上下文
*
* 上文:HDFS的输入
* 下文:Reduce
*/
// 获取数据: I love Beijing
String data = v1.toString();
// 分词
String[] words = data.split(" ");
// 输出
for (String word : words) {
// k2单词 v2记一次数
context.write(new Text(word), new IntWritable(1));
}
}
}
(2)reducer
定义一个 reducer,需要继承 Reducer
类,并重写 reduce
方法,代码如下:
package wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
/**
* context代表reduce的上下文
*
* 上文:Map
* 下文:输出HDFS
*
*/
// 对v3求和
int total = 0;
for (IntWritable v : v3) {
total += v.get();
}
// 输出k4是单词k3,输出v4是累计计数
context.write(k3, new IntWritable(total));
}
}
(3)job
最后还需要一个总控启动的作业程序,代码如下:
package wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
4、打包
要在集群上运行,必须先将程序打成一个 jar 包。
- 可以通过 IDE 直接 export,选中工程,右键 -> "export",选中导出为 jar 文件 -> "Next"
选中要导出保存的位置 -> "Next"
选择主类 -> Finish
- 使用 Maven 打包,在 pom 中添加以下配置
<build>
<pluginManagement>
<plugins>
<!-- 打包配置,配置了哪些内容需要打包到jar里,配置了manifest文件的内容 -->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<!-- 创建一个classpath清单条目,告诉虚拟机,哪些类和包需要加入classpath -->
<addClasspath>true</addClasspath>
<mainClass>wc.WordCountMain</mainClass> <!-- 指定mainclass --> <!-- 指定classpath条目前缀,默认是"" -->
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
</configuration>
</plugin>
// others plugins
<plugins>
<pluginManagement>
<build>
执行以下命令打包
mvn clean package dependency:copy-dependencies -DoutputDirectory=target/lib -DincludeScope=runtime -DskipTests
打包后在 target 目录下产生工程 jar 包和所有的依赖库
5、执行
将打包的 jar 复制到 hadoop 环境中,执行以下命令
export HADOOP_CLASSPATH=/root/temp/HadoopGuide/PkgMapReduceDemo/PkgMapReduceDemo-0.0.1-SNAPSHOT.jar:/root/temp/HadoopGuide/PkgMapReduceDemo/lib/*.jar
hadoop jar PkgMapReduceDemo-0.0.1-SNAPSHOT.jar /input/data.txt /output/wc
五、测试
不管是简单还是复杂的程序都应该在本地经过单元测试,MRUnit 正是这样一个测试库,便于将已知的输入传递给 mapper 或者检查 reducer 的输出是否符合预期。MRUnit 与标准的测试执行框架一起使用,可以在正常的开发环境中运行 MapReduce 作业的测试。
1、mapper 测试
@Test
public void testMapper() throws Exception {
// 创建一个WordCountMapper的测试对象
WordCountMapper mapper = new WordCountMapper();
// 创建一个Driver进行单元测试
MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>(mapper);
// 指定Map输入的数据
driver.withInput(new LongWritable(1), new Text("I love Beijing"));
// 指定Map的输出(是我们希望得到的结果)
driver.withOutput(new Text("I"), new IntWritable(1))
.withOutput(new Text("love"), new IntWritable(1))
.withOutput(new Text("Beijing"), new IntWritable(1));
// 执行单元测试,对比:expected result 和 actual result
driver.runTest();
}
2、reducer 测试
@Test
public void testReducer() throws Exception {
// 创建一个WordCountReducer的测试对象
WordCountReducer reducer = new WordCountReducer();
// 创建一个Driver进行单元测试
ReduceDriver<Text, IntWritable, Text, IntWritable> driver = new ReduceDriver<>(reducer);
// 构造Reduce的输入 List
List<IntWritable> value3 = new ArrayList<IntWritable>();
value3.add(new IntWritable(1));
value3.add(new IntWritable(1));
value3.add(new IntWritable(1));
driver.withInput(new Text("Beijing"), value3);
// 指定reduce的输出:是我们希望得到的结果
driver.withOutput(new Text("Beijing"), new IntWritable(3));
driver.runTest();
}
3、job 测试
@Test
public void testJob() throws Exception{
//创建测试对象
WordCountMapper mapper = new WordCountMapper();
WordCountReducer reducer = new WordCountReducer();
//创建Driver
//MapReduceDriver<K1, V1, K2, V2, K4, V4>
MapReduceDriver<LongWritable,Text,Text,IntWritable,Text,IntWritable>
driver = new MapReduceDriver(mapper, reducer);
//指定Map输入的数据
driver.withInput(new LongWritable(1), new Text("I love Beijing"))
.withInput(new LongWritable(2), new Text("I love China"))
.withInput(new LongWritable(3), new Text("Beijing is the capital of China"));
//指定reduce的输出:是我们希望得到的结果
//考虑排序规则
driver.withOutput(new Text("Beijing"), new IntWritable(2))
.withOutput(new Text("China"), new IntWritable(2))
.withOutput(new Text("I"), new IntWritable(2))
.withOutput(new Text("capital"), new IntWritable(1))
.withOutput(new Text("is"), new IntWritable(1))
.withOutput(new Text("love"), new IntWritable(2))
.withOutput(new Text("of"), new IntWritable(1))
.withOutput(new Text("the"), new IntWritable(1));
driver.runTest();
}
六、高级特性
1、序列化
序列化(serialization)是指将结构化对象转化为字节流以便在网络上传输或写到磁盘进行永久存储的过程。
反序列化(deserialization)是指将字节流转回结构化对象的逆过程。
在 Hadoop 中,系统中多个节点上进程间的通信是通过 “远程过程调用” (RPC)实现的。RPC 协议将消息序列化成二进制流后发送到远程节点,远程节点接着讲二进制流反序列化为原始消息。
对于 MapReduce 的两阶段的 k-v 类型,Hadoop 使用的是自己的序列化格式 Writable,具有绝对紧凑、速度快的特点,但不太容易用 Java 语言之外的语言进行扩展或使用。要提高可扩展性,可以使用一些序列化框架,如 Avro、Thrift、Protocol Buffers 等。
可以自定义序列化对象,并在程序中使用。
下面的表格是一些数据,表示员工表的数据,从左到右的字段属性分别为:
员工号、员工姓名、职位、员工上级、出生日期、工资、奖金、所属部门。
7369 | SMITH | CLERK | 7902 | 1980/12/17 | 800 | 0 | 20 |
---|---|---|---|---|---|---|---|
7499 | ALLEN | SALESMAN | 7698 | 1981/2/20 | 1600 | 300 | 30 |
7521 | WARD | SALESMAN | 7698 | 1981/2/22 | 1250 | 500 | 30 |
7566 | JONES | MANAGER | 7839 | 1981/4/2 | 2975 | 0 | 20 |
7654 | MARTIN | SALESMAN | 7698 | 1981/9/28 | 1250 | 1400 | 30 |
7698 | BLAKE | MANAGER | 7839 | 1981/5/1 | 2850 | 0 | 30 |
7782 | CLARK | MANAGER | 7839 | 1981/6/9 | 2450 | 0 | 10 |
7788 | SCOTT | ANALYST | 7566 | 1987/4/19 | 3000 | 0 | 20 |
7839 | KING | PRESIDENT | -1 | 1981/11/17 | 5000 | 0 | 10 |
7844 | TURNER | SALESMAN | 7698 | 1981/9/8 | 1500 | 0 | 30 |
7876 | ADAMS | CLERK | 7788 | 1987/5/23 | 1100 | 0 | 20 |
7900 | JAMES | CLERK | 7698 | 1981/12/3 | 950 | 0 | 30 |
7902 | FORD | ANALYST | 7566 | 1981/12/3 | 3000 | 0 | 20 |
7934 | MILLER | CLERK | 7782 | 1982/1/23 | 1300 | 0 | 10 |
为员工创建一个序列化对象 Employee,Writable 是一个接口,定义了两个方法,write 方法是将对象序列化写入到输出流中,readFields 方法是从输入流中读取二进制数据并反序列化为对象。
//数据 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
public class Employee implements Writable {
private int empno; // 员工号
private String ename; // 姓名
private String job; // 职位
private int mgr; // 老板的员工号
private String hiredate;// 入职日期
private int sal; // 薪水
private int comm; // 奖金
private int deptno; // 部门号
@Override
public void readFields(DataInput input) throws IOException {
// 实现反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
//一定注意:反序列化和序列化的顺序一定要一致!!!!!!
@Override
public void write(DataOutput output) throws IOException {
// 实现序列化的过程
//7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
// getter and setter
}
现在要实现一个简单的功能,输出所有员工信息,按照 toString 的格式化输出并按照员工号排序。
mapper 程序如下:
// k1 v1 k2 部门号 v2 员工对象
public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据: 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板
e.setMgr(Integer.parseInt(words[3]));
//入职日期
e.setHiredate(words[4]);
//薪水
e.setSal(Integer.parseInt(words[5]));
//奖金
e.setComm(Integer.parseInt(words[6]));
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出 k2 员工号 空值
context.write(new IntWritable(e.getDeptno()), e);
}
}
reducer 程序如下:
// k3部门号 v3员工对象 k4部门号 k4部门薪水
public class EmployeeReduce extends Reducer<IntWritable, Employee, IntWritable, Employee> {
@Override
protected void reduce(IntWritable key3, Iterable<Employee> value3, Context context)
throws IOException, InterruptedException {
for (Employee e: value3) {
// 部门号 员工对象
context.write(key3, e);
}
}
}
因为 IntWritable 是一个实现了 Comparable 接口的对象,所以具有对数字进行排序的功能。
最后写一个作业程序:
// 使用MapReduce程序计算每个部门的工资总额(使用自定义的序列化对象Employee)
public class EmployeeMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(Job.QUEUE_NAME, "MR");
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(conf);
job.setJarByClass(EmployeeMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(EmployeeReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Employee.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true);
}
}
将程序打包并执行
hadoop jar p3.jar /scott/emp.csv /output/p3
执行结果如下
[root@bigdata111 program]# hdfs dfs -cat /output/p3/part-r-00000
7369 Employee [empno=7369, ename=SMITH, sal=800, deptno=20]
7499 Employee [empno=7499, ename=ALLEN, sal=1600, deptno=30]
7521 Employee [empno=7521, ename=WARD, sal=1250, deptno=30]
7566 Employee [empno=7566, ename=JONES, sal=2975, deptno=20]
7654 Employee [empno=7654, ename=MARTIN, sal=1250, deptno=30]
7698 Employee [empno=7698, ename=BLAKE, sal=2850, deptno=30]
7782 Employee [empno=7782, ename=CLARK, sal=2450, deptno=10]
7788 Employee [empno=7788, ename=SCOTT, sal=3000, deptno=20]
7839 Employee [empno=7839, ename=KING, sal=5000, deptno=10]
7844 Employee [empno=7844, ename=TURNER, sal=1500, deptno=30]
7876 Employee [empno=7876, ename=ADAMS, sal=1100, deptno=20]
7900 Employee [empno=7900, ename=JAMES, sal=950, deptno=30]
7902 Employee [empno=7902, ename=FORD, sal=3000, deptno=20]
7934 Employee [empno=7934, ename=MILLER, sal=1300, deptno=10]
2、排序
Hadoop 自带的 Writable 类型都实现了 Comparable 接口,所以自带排序功能,所以自定义对象为了实现序列化和排序功能,需要实现 WritableComparable 接口。
现在利用排序功能实现一个需求,先按照员工部门号排序,如果部门号相同,则按照员工薪水排序,则 Employee 代码如下:
//数据 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
public class Employee implements WritableComparable<Employee> {
private int empno; // 员工号
private String ename; // 姓名
private String job; // 职位
private int mgr; // 老板的员工号
private String hiredate;// 入职日期
private int sal; // 薪水
private int comm; // 奖金
private int deptno; // 部门号
// 按照多个列进行排序
@Override
public int compareTo(Employee o) {
// 按照员工部门号排序
if (this.deptno > o.getDeptno()) {
return 1;
} else if (this.deptno < o.getDeptno()) {
return -1;
}
// 按照员工薪水进行排序
if (this.sal >= o.getSal()) {
return 1;
} else {
return -1;
}
}
@Override
public void readFields(DataInput input) throws IOException {
// 实现反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
//一定注意:反序列化和序列化的顺序一定要一致!!!!!!
@Override
public void write(DataOutput output) throws IOException {
// 实现序列化的过程
//7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
// getter and setter
}
mapper 输出只需要输出 key:Employee ,value 为空,代码如下:
// k1 v1 k2 员工号 v2 员工对象
public class EmployeeMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据: 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板
e.setMgr(Integer.parseInt(words[3]));
//入职日期
e.setHiredate(words[4]);
//薪水
e.setSal(Integer.parseInt(words[5]));
//奖金
e.setComm(Integer.parseInt(words[6]));
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出 k2 员工号 空值
context.write(e, NullWritable.get());
}
}
job 启动程序如下:
// 在 MapReduce 程序中使用自定义的序列化对象:得到每个员工号、对应员工信息的输出结果
public class EmployeeMain {
public static void main(String[] args) throws Exception {
//1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmployeeMain.class);
//2、指定任务的Map,Map的输出类型
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(Employee.class); // k2 员工
job.setMapOutputValueClass(NullWritable.class); // v2 员工对象
// 因为员工对象实现了WritableComparable接口,所以无需显式指定
// job.setSortComparatorClass(Employee.class)
job.setOutputKeyClass(Employee.class); // k4
job.setOutputValueClass(NullWritable.class); // v4
//4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
打包执行程序,输入数据还是 /scott/emp.csv
,结果如下:
[root@bigdata111 program]# hdfs dfs -cat /output/p8/part-r-00000
Employee [empno=7934, ename=MILLER, sal=1300, deptno=10]
Employee [empno=7782, ename=CLARK, sal=2450, deptno=10]
Employee [empno=7839, ename=KING, sal=5000, deptno=10]
Employee [empno=7369, ename=SMITH, sal=800, deptno=20]
Employee [empno=7876, ename=ADAMS, sal=1100, deptno=20]
Employee [empno=7566, ename=JONES, sal=2975, deptno=20]
Employee [empno=7902, ename=FORD, sal=3000, deptno=20]
Employee [empno=7788, ename=SCOTT, sal=3000, deptno=20]
Employee [empno=7900, ename=JAMES, sal=950, deptno=30]
Employee [empno=7521, ename=WARD, sal=1250, deptno=30]
Employee [empno=7654, ename=MARTIN, sal=1250, deptno=30]
Employee [empno=7844, ename=TURNER, sal=1500, deptno=30]
Employee [empno=7499, ename=ALLEN, sal=1600, deptno=30]
Employee [empno=7698, ename=BLAKE, sal=2850, deptno=30]
3、分区
在 MapReduce 中,会将同一个分区的数据发送到同一个 reduce 当中进行处理。
比如为了数据的统计,可以将一批类似的数据发送到同一个 reduce 中,在同一个 reduce 当中统计类型相同的数据,就可以实现类似的数据分区和统计等。其实就是相同类型的数据,有共性的数据,送到一起去处理。
默认情况下 MapReduce只有一个分区(一个分区是一个输出文件)。而自定义分区器的创建需要继承 org.apache.hadoop.mapreduce.Partitioner
。
在序列化的程序中,程序执行结果输出的目录为 /output/p3
。
我们对程序进行改造,加入自定义分区器,根据部门号分为3个分区,代码如下:
// 分区器:根据Map的输出<k2 v2> k2部门号 v2 员工对象
public class MyPartitioner extends Partitioner<IntWritable, Employee> {
@Override
public int getPartition(IntWritable k2, Employee v2, int numTask) {
// 建立对应的分区
// numTask: 分区的个数
// 得到部门号
int deptno = v2.getDeptno();
if (deptno == 10) {
// 放入一号分区
return 1%numTask;
} else if (deptno == 20) {
// 放入二号分区
return 2%numTask;
} else {
// 放入三号分区
return 3%numTask;
}
}
}
部门号为 10、20、30 的数据,计算后得到的分区号分别为 1、2、0,一个分区对应一个输出文件,对应到程序输出文件分别为 part-r-00001、part-r-00002、part-r-00000。
mapper、reducer 的代码不变动,修改下作业主程序:
// 使用MapReduce程序计算每个部门的工资总额(使用自定义的序列化对象Employee)
public class EmployeeMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(Job.QUEUE_NAME, "MR");
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(conf);
job.setJarByClass(EmployeeMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
// 指定分区规则
job.setPartitionerClass(MyPartitioner.class);
// 分区的个数
job.setNumReduceTasks(3);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(EmployeeReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Employee.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true);
}
}
将修改后的程序打包为 p9.jar,通过以下命令执行:
hadoop jar p9.jar /scott/emp.csv /output/p9
执行输出结果如下:
4、Combiner
集群上的可用带宽限制了 MapReduce 作业的数量,因此尽量避免 map 和 reduce 任务之间的数据传输是有利的。Hadoop 允许用户针对 map 任务的输出指定一个 combiner(就像 mapper 和 reducer 一样),combiner 的输出作为 reduce 函数的输入。由于 combiner 属于优化方案,所以 Hadoop 无法确定要对一个指定的 map 任务输出记录调用多少次 combiner(如果需要)。
不管调用多少次 combiner,reducer 的输出结果都应该是一样的。
以 WordCount 小程序为例,mapper 会对输入数据行进行分词,并输出每个得到的单词的一次计数,如果能在 map 端统计得到所有单词的总计数,再将输出作为 reducer 的输入,可以极大降低网络数据传输的开销。
如上图所示,粉色块即为 mapper 通过网络传输给 reducer 的数据,使用 combiner 之后明显变少,有利于提高性能。
Combiner 是一种特殊的 Reducer,运行在 map 端,对 mapper 的输出结果在本地进行聚合,所以一般情况下,Reducer 可以直接作为 Combiner 使用,对 WordCount 的作业启动类进行修改:
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Text比较规则:这里按字母倒序
job.setSortComparatorClass(MyTextComparator.class);
// 加入一个Combiner
job.setCombinerClass(WordCountReducer.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
由于 WordCount 小程序没有大量数据作为程序输入,无法直观地看到使用 Combiner 后的性能提升,这里使用《Hadoop权威指南》中的求每年最高气温的小程序 MaxTemperature
作为演示程序。
先根据 NCDD气象大数据处理 指引下载预处理好数据。
接着使用以下脚本处理数据,将压缩文件解压到 txt 文件中,并上传到 HDFS。
#!/bin/bash
hdfsDir="/hadoop/data/ncdc/noaa_all"
hdfsTxtDir="/hadoop/data/ncdc/noaa_txt"
hdfs dfs -rm -r $hdfsTxtDir
hdfs dfs -mkdir -p $hdfsTxtDir
for i in $(seq $1 $2)
do
hdfs dfs -get $hdfsDir/$i.gz .
gunzip -c $i.gz >> $i.txt
hdfs dfs -put $i.txt $hdfsTxtDir
done
执行脚本
sh getall.sh 1901 1950
这样就在 /hadoop/data/ncdc/noaa_txt
目录下得到每个年份的所有气象数据。
气象数据中,每行数据就是某年某月某个气象站在某个时间点观测到的一条气象数据,1901.txt 包含了 1901 年的所有气象数据。每条数据第1518位表示年份,第8791位表示气温(第87位表示气温是零上还是零下),第92位表示空气质量代码。
现在有一个需求,获得每一年的最高气温,实现代码如下。
Mapepr 过滤掉缺失的气象数据和空气质量代码不正常的气象数据,然后输出 kv 为年份和气温,代码如下:
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
/**
* 示例处理数据:
* "0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999"
* ^ ^ ^ ^
* [年份:15~18位] [气温:] [空气质量代码]
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
// 解析气温数据,如果是+表示气温为0°以上
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
// 空气质量代码
String quality = line.substring(92, 93);
// 如果气温数据为9999,这是个缺省的数据,表示该数据实际上缺失
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
Reducer 函数会拿到同一年的所有气象数据,并从中获取最大值,代码如下:
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
作业配置代码如下:
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
// 1、创建任务Job,并且指定任务的入口和任务名
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
// 2、指定任务的Map,Map的输出类型(若跟Reduce的输入类型一样则无需指定)
job.setMapperClass(MaxTemperatureMapper.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务并打印日志
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行以下命令执行
# 打包 jar 包时,如果不指定主类,可在执行时再指定主类
hadoop jar nocombiner.jar MaxTemperature.MaxTemperature /hadoop/data/ncdc/noaa_txt /output/nocombiner
程序执行成功之后,结果如下:
[root@bigdata111 program]# hdfs dfs -cat /output/nocombiner/part-r-00000
1901 317
1902 244
1903 289
1904 256
1905 283
1906 294
1907 283
1908 289
1909 278
1910 294
1911 306
1912 322
1913 300
1914 333
1915 294
1916 278
1917 317
1918 322
1919 378
1920 294
1921 283
1922 278
1923 294
1924 294
1925 317
1926 489
1927 489
1928 378
1929 328
1930 400
1931 461
1932 489
1933 489
1934 478
1935 478
1936 550
1937 489
1938 489
1939 489
1940 489
1941 462
1942 479
1943 485
1944 507
1945 496
1946 494
1947 490
1948 601
1949 511
1950 494
查看 Yarn 控制台,可以看到执行时间花了 4 分多钟。
点击 History,可以看到一共有 124 个 map 任务,而 reduce 只有一个。
接下来修改一下作业代码,添加 combiner :
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
// 1、创建任务Job,并且指定任务的入口和任务名
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
// 2、指定任务的Map,Map的输出类型(若跟Reduce的输入类型一样则无需指定)
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务并打印日志
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行程序
hadoop jar usecombiner.jar MaxTemperature.MaxTemperature /hadoop/data/ncdc/noaa_txt /output/usecombiner
程序执行完后,输出结果与不适用 combiner 是一致的,执行时间却没有达到预期的比不使用 combiner 少囧。。。我理解是实验虚拟机之间的虚拟网络处理这点数据带宽开销完全够用,需要在更大的集群环境、更海量的数据运算才能验证到性能的提升。
5、洗牌
MapReduce 确保每个 reducer 的输入都是按键排序的。系统执行排序,将 map 输出作为输入传给 reducer 的过程称为 shuffle。
shuffle 的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。而在 MapReduce 中,shuffle 更像是洗牌的逆过程,指的是将 map 端的无规则数据按指定的顺序 “打乱” 成具有一定规则的数据,以便 reduce 端接收处理。其在 MapReduce 中所处的工作阶段是 map 输出后到 reduce 接收前,具体可以分为 map 端和 reduce 端前后两个部分。
shuffle 的完整过程如下图所示:
(1)map 阶段
在 map 阶段,MapReduce 会对要处理的数据进行分片(split)操作,为每个分片分配一个 MapTask 任务,默认情况下一个分片大小等同于一个 dfs 数据块大小。
map 阶段处理的步骤如下:
① 数据分片
map 函数对分片中的每一行数据进行处理得到键值对(key-value),其中 key 为偏移量,value 为一行内容,接着 map 函数处理输入键值对产生输出。
② 写内存缓冲区
为了提高性能和效率,输出并非直接写到磁盘中,而是缓冲写到内存中并进行预排序,每个 map 任务都有一个环形内存缓冲区用于存储任务输出,并且缓冲区使用比例达到阈值后,后台线程便把内容溢出(spill)到磁盘。
在溢出写到磁盘过程中,map 输出继续写到缓冲区,如果在此期间缓冲区被填满,map 会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区写到在作业指定目录下指定的目录中。
③ 分区
在写磁盘前,线程首先根据数据最终要传的 reducer 将数据划分成响应的分区(partition)。
④ 排序
在每个分区中,后台线程按键进行内存中排序,如果有 combiner 函数,就在排序后的输出上运行,使 map 输出结果更紧凑,减少写到磁盘的数据和传递给 reducer 的数据。
⑤ 落盘
每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在 map 任务写完其最后一个输出记录之后,会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
如果溢出文件比较多,combiner 会在输出文件写到磁盘前再运行,同时写到磁盘前可以进行压缩节约磁盘空间和传输给 reducer 的数据量。
相关的参数:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.task.io.sort.mb | 100 | 内存缓冲区大小,单位:MB |
mapreduce.map.sort.spill.percent | 0.80 | 缓冲区内容溢出到磁盘的使用比例阈值 |
mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 存储溢出文件的本地目录 |
mapreduce.task.io.sort.factor | 10 | 溢出文件合并为输出文件时一次最多能合并多少流 |
mapreduce.map.combine.minspills | 3 | 至少存在多少个溢出文件时,combiner 才会在生成输出文件前再次执行 |
mapreduce.map.output.compress | false | map 输出到磁盘前是否进行压缩 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.<br />compress.DefaultCodec | 启用 map 输出压缩功能时使用的压缩格式 |
(2)reduce 阶段
map 输出文件位于运行 map 任务的 tasktracker 的本地磁盘,而 reduce 任务需要集群上的若干个 map 任务的输出作为其特殊的分区文件(即不同 map 的输出文件中相同分区数据发送给同一个 reducer 处理)。
reduce 阶段的处理分为复制、排序合并、正式执行 reduce 函数三大阶段。
① 复制
在每个 map 任务完成时,reduce 任务就开始复制其输出,并且这个过程是多个线程并行的。
reducer 如何知道从哪台机器获取 map 输出?
map 任务成功后,会通过心跳机制通知 application master,因此 am 知道 map 输出保存在哪台机器上。而 reducer 会有线程定期询问 am map 输出的机器位置,直到获得所有输出位置。
如果 map 输出比较小,会被复制到 reducer 的 JVM 内存中,否则复制到磁盘。reducer 中也有内存缓冲区,一旦使用达到一定阈值,或者 map 输出达到阈值,则触发合并操作,并且溢出写到磁盘中。
② 合并
随着磁盘上 map 输入文件的增多,后台线程会将它们合并为更大的、排好序的文件。根据合并因子,对每一定数量的文件执行一次合并操作,产生一个合并中间文件。
最后,多个合并中间文件会再次执行一次合并,并将合并后数据直接作为 reduce 函数的输入,减少一次磁盘往返形程。
③ 执行 reduce()
对已排序输出中的每个键调用 reduce 函数,输出直接写到输出文件系统,一般为 HDFS。node manager 和 data node 运行在同一个节点上,输出文件的第一个副本会被写到本地磁盘。
相关参数如下:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.reduce.shuffle.parallelcopies | 5 | reducer 并行复制 map 输出线程数 |
mapreduce.reduce.shuffle.input.buffer.percent | 0.70 | 用于存储复制到 reducer 的 map 输出占 JVM 堆空间的最大百分比,超过该阈值,则 map 输出复制到磁盘 |
mapreduce.reduce.shuffle.merge.percent | 0.66 | 使用内存用于合并的使用阈值,超过该阈值,合并后结果溢出写到磁盘 |
mapreduce.reduce.merge.inmem.threshold | 1000 | 仅只用内存来合并的 map 输出阈值,假如内存中的文件数量超过该阈值,则合并后结果溢出写到磁盘 |
mapreduce.task.io.sort.factor | 10 | 合并因子,每多少个 map 输出就执行一次合并操作 |
七、工作机制
1、作业运行机制
(1)作业的提交
作业的提交包含了:步骤1~步骤4。
作业通过 org.apache.hadoop.mapreduce.Job
对象的 submit()
或 waitForCompletion()
方法提交,通过 submit() 提交的作业属于异步提交,而通过 waitForCompletion() 提交的作业会一直等待直到作业执行完成(不管成功还是失败,waitForCompletion() 内部也调用到了submit())。
submit() 方法会创建一个 JobSummiter
对象,并调用该对象的 submitJobInternal()
方法正式提交作业,提交作业的过程如下:
① 向 resource manager 请求生成一个 job id;
② 检查作业的输出说明,如果没有指定输出目录或者目录已存在,则不提交作业返回报错;
③ 计算作业的输入分片,如果分片无法计算(比如输入路径不存在),则不提交作业返回报错;
④ 将运行作业所需资源复制到一个以 job id 命名的目录下的共享文件系统中;
PS. 共享资源包括作业 JAR、配置文件和计算所得的输入分片。
⑤ 调用 RM 的 submitApplication() 方法提交作业。
(2)作业的初始化
① RM 收到提交作业的请求后,就会通过 Yarn Scheduler 分配一个容器,并在容器中启动 application master 进程;(步骤5)
② 该进程是一个 MapReduce 应用程序,会通过创建多个对象来保持对作业进度的跟踪;(步骤6)
③ 接受来自共享文件系统的、在客户端计算的输入分片,针对每个分片创建一个 map 任务对象和由 mapreduce.job.reduces 属性(通过作业的 setNumReduceTask() 方法设置)确定的多个 reduce 任务对象,并会为每个任务分配一个 task id;(步骤7)
如果作业太小,处理的数据也很小,则 application master 会选择和自己在同一个 JVM 上运行任务,这种作业称为 uberized,相关的 mapred-site.xml 中的参数如下:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.job.ubertask.maxmaps | 9 | 认定为 ubertask 的 map 最大作业数阈值 |
mapreduce.job.ubertask.maxreduces | 1 | 认定为 ubertask 的 reduces 最大作业数阈值 |
mapreduce.job.ubertask.maxbytes | 最大输入字节数的阈值,默认为一个数据块大小,即 dfs.block.size | |
mapreduce.job.ubertask.enable | false | 是否开启uberized特性 |
(3)任务的分配
如果任务不是 uber 任务,则 application master 会为作业中所有的 map 任务和 reduce 任务向 RM 申请容器。先为 map 申请,并知道有 5% 的 map 任务完成时,才为 reduce 任务申请。
reduce 任务可以在集群中任何位置运行,而 map 任务的运行具有 “数据本地化” 特性(性能:数据本地化 > 机架本地化 -> 移动数据运算)。
对每个作业,可以配置分配到的内存和CPU,默认情况下会为每个任务分配1G内存和一个虚拟内核,相关的参数如下:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.map.momery.mb | 1024 | 从调度器为每个 map 任务申请的内存 |
mapreduce.map.cpu.vcores | 1 | 从调度器为每个 map 任务申请的虚拟内核数 |
mapreduce.reduce.memory.mb | 1024 | 从调度器为每个 task 任务申请的内存 |
mapreduce.reduce.cpu.vcores | 1 | 从调度器为每个 task 任务申请的虚拟内核数 |
(4)任务的执行
① 当 RM 为任务分配在一个 node manager 上的容器时,application master 通过与 NM 通信来启动容器;(步骤9)
② 任务会由主类为 YarnChild 的一个 Java 应用程序执行,执行之前会将任务需要的资源本地化;(步骤10)
③ 最后,执行 map 任务或 reduce 任务。(步骤11)
(5)进度和状态更新
每个作业和任务都有一个状态:包括任务或作业的状态、map 和 reduce 的进度、作业计数器的值、状态消息或描述。
对 map 任务,任务进度是已处理输入所占的比例。
对 reduce 任务,整个处理的过程分为复制、排序、reduce三阶段(每个占1/3),如果任务已完成 reduce 一半的输入,则任务的进度为 5/6。
mapreduce 任务与 application master 的通信:
map 任务和 reduce 任务每隔 3 秒回向自己的 application master 报告进度和状态,所以 application master 会形成一个作业的汇聚视图;通过 Yarn web console 可以看到每个应用程序作业执行的情况,以及每个作业下所有任务的状态进度。
客户端每秒钟会轮询一次 application master 以接收最新状态。
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.client.progressmonitor.pollinterval | 1000(单位:ms) | 客户端轮询 application master 接收作业状态的时间间隔 |
(6)作业的完成
当 application master 收到作业最后一个任务完成的通知,就将 Job 的状态设置为 “成功”。
2、失败处理机制
(1)任务运行失败
MapReduce 任务在运行时可能会失败,原因有:
① 任务用户代码抛出异常;
② JVM 退出异常,比如 OOM;
③ 任务没有在超时时间间隔内向 application master 汇报进度和状态(默认3秒汇报一次);
④ 任务的尝试失败次数达到限制,整个作业会被认定为失败;
⑤ 整个作业中失败任务占所有任务的比例达到限制,整个作业会被认定为失败。
相关的参数设置:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.task.timeout | 60000(单位:ms) | 如果任务既不读取输入、写输出,也不更新状态字符串,则终止任务前的毫秒数。设置为0时则禁用超时。 |
mapreduce.map.maxattempts | 4 | map 任务失败的最大尝试次数 |
mapreduce.reduce.maxattempts | 4 | reduce 任务失败的最大尝试次数 |
mapreduce.map.failures.maxpercent | 0 | 在不触发作业失败的情况下允许 map 任务失败占所有 map 任务的最大百分比 |
mapreduce.reduce.failures.maxpercent | 0 | 在不触发作业失败的情况下允许 reduce 任务失败占所有 reduce 任务的最大百分比 |
(2)application master 运行失败
由于硬件和网络原因,application master 也可能会失败,Yarn 和 MapReduce 都允许配置 am 的失败尝试次数,如果两个参数都设置了,则以 Yarn 的参数为准,一般设置为一致。
application master 被认定为失败后,会在一个新的容器启动一个新的 master 实例,并使用作业历史来恢复失败的应用程序所运行任务的状态,使其不必重新运行
相关参数如下:
变量名 | 默认值 | 含义 |
---|---|---|
mapreduce.am.max-attempts | 2 | 最大 application 尝试次数,若比 Yarn 设置的参数大则会被覆盖 |
yarn.resourcemanager.am.max-attempts | 2 | Yarn 允许的 application 最大尝试次 |
yarn.app.mapreduce.am.job.recovery.enable | true | 在 am 失败后是否自动恢复 |
(3)节点管理器运行失败
NM 也要定期向 RM 发送心跳,如果在超时时间范围内没发送心跳,则 RM 认定该节点失败,并将其从自己的节点池中移除。
对在节点上运行的所有任务和 am 进行故障转移恢复。am 的恢复如上;如果是未完成作业的 map 任务,am 会安排重新执行,因为 map 任务执行完的输出是保存在本地的,如果是网络故障,则数据无法被 reduce 任务访问到。
如果某个 NM 上任务运行失败次数过高,am 会将其拉入黑名单,并且尽量减少调度任务在该 NM 上,失败次数有参数控制。
相关的参数如下:
变量名 | 默认值 | 含义 |
---|---|---|
yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms | 60000 | NM 被 RM 认定为故障的最大心跳发送超时时间 |
mapreduce.job.maxtaskfailures.per.tracker | 3 | NM 最大任务失败阈值,超过该值,则作业 am 会尽量把任务调度到其他 NM 上;该值必须比 mapreduce.map.maxattempts、mapreduce.reduce.maxattempts 小,否则永远不会在其他节点上尝试失败的任务 |
(4)资源管理器运行失败
RM 运行失败故障的后果很严重,会导致作业和容器无法启动,为了避免单点故障,要开启 HA 功能。Yarn RM 通常跟 HDFS 的 NM 运行在同一台机器上,相关 HA 的配置详见 HDFS 的 HA 模式。
在 HA 的架构中,所有运行中的应用程序的信息存储在一个高可用的状态存储区中(由 zk 或 HDFS 备份),这样备用 RM 可以恢复出失败的主 RM 的关键状态。当新的 RM 启动后,它从状态存储区中读取应用程序的信息,然后为集群中所有的应用程序重启 application master。
八、编程案例
使用 MapReduce 能够实现 SQL 中一些功能,最终可以基于 MapReduce 实现用一条 SQL 来查询大数据的效果,而 hive 就是这样一个基于 MapReduce 作业的组件。
1、数据去重
在 SQL 中常用 distinct 关键字实现数据去重的效果,现在想要查询员工表(emp)中所有的职位,用 SQL 表示为:
select distinct job from emp;
现在写一个 MapReduce 程序实现去重的功能。
对于 mapper,需要从或取到的一行数据进行分词,再找到职位名称,所以输出 key 为 job,无需输出 value,所以为 NullWritable,代码如下:
// 把job作为k2 v2为 null
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 数据: 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
String data = value1.toString();
// 分词
String[] words = data.split(",");
// 输出 把job作为k2 v2为 null
context.write(new Text(words[2]), NullWritable.get());
}
}
数据从 map 端 shuffle 到 reduce 端的过程,自动实现了分组,相同 map 输出的 key 会被同一个 reduce 处理,这里忽略 value 中的空数据,直接得到一个想要的 key,并作为输出,直接实现了去重的效果,reducer 的代码如下:
public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text k3, Iterable<NullWritable> v3, Context context) throws IOException, InterruptedException {
// v3 对程序来说没有意义,直接不处理
context.write(k3, NullWritable.get());
}
}
作业配置代码:
// 测试按照职业进行去重
// 相当于: select distinct job from emp;
public class DistinctMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(DistinctMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(DistinctMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(DistinctReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
代码打包成 distinct.jar,提交到 hadoop 环境执行。
hadoop jar distinct.jar /scott/emp.csv /output/distinct
程序执行结果如下:
<>br
2、多表连接
在 SQL 中,使用 join 关键字可实现多表连接,如果没有连接条件,就会形成一个列数相加,行数相乘的一个笛卡尔积的数据集;通常笛卡尔积中有很多数据是错误的,需要有连接条件,在该数据集上进一步筛选过滤,得到最终想要的争取的数据。
员工表(emp)结构如下:
emp_no | ename | job | mgr_no | hiredate | sal | comm | dept_no |
---|---|---|---|---|---|---|---|
7369 | SMITH | CLERK | 7902 | 1980/12/17 | 800 | 0 | 20 |
... | ... | ... | ... | ... | ... | ... | ... |
部门表(dept)结构如下:
dept_no | dname | city |
---|---|---|
10 | ACCOUNTING | NEW YORK |
... | ... | ... |
现在要实现一个需求,求员工姓名和部门名称的对应关系的数据,使用 SQL 表示为:
select e.ename, d.dname
from emp e, dept d
where e.dept_no = d.dept_no;
现在两张表的数据分别存在 HDFS 的 /scott/emp.csv
、/scott/dept.csv
,用 MapReduce 实现如下:
mapper
public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 部门信息:10,ACCOUNTING,NEW YORK
// 员工信息:7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10
String data = value1.toString();
// 分词
String[] words = data.split(",");
// 根据数组长度判断是部门信息还是员工信息
if (words.length == 3) {
// 部门表:部门号,部门名称
context.write(new IntWritable(Integer.valueOf(words[0])), new Text("*" + words[1]));
} else {
// 员工表:部门号,员工姓名
context.write(new IntWritable(Integer.valueOf(words[7])), new Text(words[1]));
}
}
}
reducer
public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException {
// 定义变量,分别保存部门名称和员工姓名(s)
String dname = "";
List<String> empNameList = new ArrayList<String>();
// 判断是否存在*号
for (Text t : v3) {
String s = t.toString();
// 是部门名称
if (s.indexOf("*") >= 0) {
dname = s.substring(1);
} else {
// 是员工姓名
empNameList.add(s);
}
}
// 输出
for (String ename : empNameList) {
context.write(new Text(dname), new Text(ename));
}
}
}
作业配置
// 等值连接
// 相当于执行:select e.ename,d.dname
// from emp e,dept d
// where e.deptno=d.deptno;
public class EqualJoinMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EqualJoinMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(EqualJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(EqualJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
打包执行
hadoop jar join.jar /scott /output/join
结果如下
上述的多表连接连接条件是判断两个表在某个相同含义的字段是否相同,也称为等值连接。在多表连接中,自连接也是一种非常常见的多表连接。
现在要实现查找经理和经理管理的员工的数据,因为经理也是员工,所以使用 SQL 表示为:
select m.ename, e.ename
from emp e, emp m
where e.mgr = m.empno;
使用 MapReduce 程序实现如下。
mapper 对输入的每个员工数据进行处理,每个员工都有可能是经理,管理着其他员工,同时可能还被其他经理管理,前者的角色是管理者,后者的角色是被管理者,所以代码如下:
public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
// 员工信息:7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10
String data = value1.toString();
// 分词
String[] words = data.split(",");
//输出
try {
// 1、作为老板表: 员工号 姓名
context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
// 2、作为员工表:老板号 姓名
context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1]));
}catch(Exception ex) {
// 表示是大老板
context.write(new IntWritable(-1), new Text(words[1]));
}
}
}
reducer 对 map 输出的数据进行处理,输入的 key 为经理名字,输入的 value 中带 “*” 表示这是个经理,不带表示这是被经理管理的员工姓名,所以代码如下:
public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> {
@Override
protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
// 定义两个变量:老板姓名、老板管理员工列表
String bossName = "";
List<String> empList = new ArrayList<String>();
for (Text t : v3) {
String name = t.toString();
// 判断是否存在*号
if (name.indexOf("*") >= 0) {
// 表示是老板
bossName = name.substring(1);
} else {
// 表示是员工
empList.add(name);
}
}
// 如果是大老板,上头没有更大的老板,跳过处理
if (bossName.length() > 0) {
for (String empName : empList) {
// 普通员工名下没有更低一级的员工,跳过处理
if (empName.length() > 0) {
context.write(new Text(bossName), new Text(empName));
}
}
}
}
}
作业配置
/**
* 使用MapReducer实现自连接
*
* 相当于执行:
* select b.ename,e.ename
* from emp e,emp b
* where e.mgr=b.empno;
*/
public class SelfJoinMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SelfJoinMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
提交执行作业
hadoop jar selfjoin.jar /scott/emp.csv /output/selfjoin
结果如下
3、倒排索引
跟一般索引根据位置信息找数据不同,倒排索引是根据搜索的关键字找到位置信息(比如所在的文件等)。
下面演示一个通过 MapReduce 程序实现倒排索引的例子。
首先分析以下需求,对文本文件中的每个单词,找到它在哪个文件中出现过和出现的次数。
对于 Map 阶段,所要做的就是分词,并且把文件信息跟关键字放在一起作为 key,value 为出现一次的计数 1。
public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据: data01.txt I love Beijing and love Shanghai
//获取输入数据的路径: /indexdata/data01.txt
String path = ((FileSplit)context.getInputSplit()).getPath().toString();
//查询最后一个斜线
int index = path.lastIndexOf("/");
//得到文件名
String fileName = path.substring(index + 1);
String data = value1.toString();
// 分词
String[] words = data.split(" ");
for (String word : words) {
context.write(new Text(word + ":" + fileName), new Text("1"));
}
}
}
在 combiner 阶段,同个单词出现在同个文件内的 Map 输出会被汇聚在一起,将 Map 输出的 key 进行拆分,将输出的 value 数组进行累加,这样就得到了某个单词在某个文件内出现的次数作为 combiner 输出的 value。
public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k21, Iterable<Text> v21, Context context)
throws IOException, InterruptedException {
// love:data01.txt (1,1,1)
// 得到单词在文件中出现的总次数
int total = 0;
for (Text t : v21) {
total += Integer.parseInt(t.toString());
}
String s = k21.toString();
int index = s.indexOf(":");
String word = s.substring(0, index);
String fileName = s.substring(index+1);
// love date01.txt:2
context.write(new Text(word), new Text(fileName + ":" + total));
}
}
在 Reduce 阶段,Combiner 的输出 key 是单词,所以同个单词在不同文件中出现次数的数据会在这里汇聚,reducer 将这些信息拼接起来,最终就得到某个单词在所有文件中出现次数的倒排索引。
public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text k3, Iterable<Text> v3, Context context)
throws IOException, InterruptedException {
// 对combiner输出的value进行拼加
String value = "";
for (Text t : v3) {
value = "(" + t.toString() + ")" + value;
}
context.write(k3, new Text(value));
}
}
作业配置
/**
* 实现倒排索引
*/
public class RevertedIndexMain {
public static void main(String[] args) throws Exception {
// 1、创建任务Job,并且指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(RevertedIndexMain.class);
// 2、指定任务的Map,Map的输出类型
job.setMapperClass(RevertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//引入Combiner
job.setCombinerClass(RevertedIndexCombiner.class);
// 3、指定任务的Reduce,Reduce的输出类型
job.setReducerClass(RevertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 4、指定任务的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 5、执行任务
job.waitForCompletion(true); // 打印日志
}
}
执行作业
hadoop jar revertedindex.jar /indexdata /output/revertedindex
结果如下
通过上述的编程案例可知,通过 MapReduce 可以实现类似 SQL 查询的效果,如果把这种功能做成通用化一点,就可以基于 MapReduce 开发一个数据仓库工具,将结构化的数据文件映射为一张数据库表,并通过 SQL 查询。