1 MapReduce是什么
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.这个可以跟排序中的归并排序类比下思想。归并采用的是分治法,也就是将一个问题分解成多个小问题,求出每个小问题的结果,然后再合并小结果成最终的结果。MapReduce亦然。
MapReduce由两个阶段组成:Map和Reduce,Map阶段进行分组计算。Reduce阶段进行合并结果。用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。
2 MapReduce工作原理
MapReduce的整个流程,需要用户进行实现的也只有两部分内容。就是Map阶段的处理和Reduce阶段的处理。总的流程,如图:

MapReduce总流程
2.1 Map
- 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。这里生成的键key是根据每一行开始字符的下标位置,解析示例如下:
123 abc// 解析之后为0,'123 abc'
321 cbd// 解析之后为8,'321 cbd'
tomy hello// 解析之后为16,'tomy hello'
- Map具体的逻辑部分,对输入的key,value(就是上一步解析出来的key,value)进行处理。解析完成之后输出新的key,value。这一部分是用户自己实现
2.2 Shuffle
这一部分是系统自动完成,只是需要进行说明下,不然在进行编程的时候发现Map阶段的输出和Reduce阶段的输入不对称,但是也可以进行变成干预,如后面介绍的Partitioner编程、自定义排序编程、Combiner编程
- 对Map输出的key,value进行分区,对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。然后将分组后的数据进行归约
2.3 Reduce
- 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
- Reduce过程,对输入的key、value处理,转换成新的key、value输出。这一部分是用户自己实现
- 把reduce的输出保存到文件中。
3 编码实践
- 实现功能:对文件中的单词进行统计,如:
// 统计内容
how are you
// 统计结果
how 1
are 1
you 1
- 实现的主要包含三个java类,WordCount、WordCountMapper、WordCountReduce
- WordCount
package com.jiyx.test.mapred.job;
import com.jiyx.test.mapred.mapper.WordCountMapper;
import com.jiyx.test.mapred.reducer.WordCountReduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 主类
* @author jiyx
* @create 2018-10-13-16:01
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 构建job对象
Job job = Job.getInstance();
// 需要设置main方法所在的类
job.setJarByClass(WordCount.class);
// 设置mapper
job.setMapperClass(WordCountMapper.class);
// Map的输出类型设置可以省略,不过只能在reducer的输入(key,value)和输出(key,value)类型相等时才行
// 也就是map的输出类型和reducer的输出类型一致的时候才能省略
// 这里不相同,不能省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ByteWritable.class);
// 需要统计的文件
FileInputFormat.setInputPaths(job, new Path("/wc.txt"));
// 设置reducer
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 统计结果输出
FileOutputFormat.setOutputPath(job, new Path("/wcResult"));
// 提交任务,并打印过程信息
job.waitForCompletion(true);
}
}
- WordCountMapper
package com.jiyx.test.mapred.mapper;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Map
* @author jiyx
* @create 2018-10-13-15:42
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, ByteWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对输入的value进行拆分后直接输出,如{18:"hello java hello"},输出为"hello",1和"java",1和"hello",1
for (String newKey : value.toString().split(" ")) {
context.write(new Text(newKey), new ByteWritable((byte) 1));
}
}
}
- WordCountReduce
package com.jiyx.test.mapred.reducer;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* Reduce
* @author jiyx
* @create 2018-10-13-15:53
*/
public class WordCountReduce extends Reducer<Text, ByteWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
// 因为输入的是类似于{"hello": 1, 1, 1, 1}这种模式,所以直接++
long count = 0;
Iterator<ByteWritable> iterator = values.iterator();
for (; iterator.hasNext(); iterator.next(), count++) {
}
context.write(key, new LongWritable(count));
}
}
- 将文件打包成以WordCount为主mainClass的jar包,如果是使用idea且打出的jar包不能执行的话,可以转到idea生成可执型jar包进行查看。
这里使用的是hadoop三种运行模式中的伪分布模式,感觉会比较麻烦一点,后期采用本地模式进行 - 然后将jar包上传到hadoop服务器,使用hadoop jar xxx.jar 执行jar包就可以了,至此第一个入门程序完成。
4 MapReduce的执行流程
- 客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
- JobClient通过RPC和ResourceManager进行通信,返回一个存放jar包的地址(HDFS)和jobId
- client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId,默认是写十份,mr结束删除文件)
- 开始提交任务(任务的描述信息,不是具体jar包,而是包括jobid、jar存放位置、配置等等的信息)
- ResourceManager进行初始化任务
- 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
- NodeManager通过心跳机制领取任务(任务的描述信息)
- 下载所需的jar,配置文件等
- NodeManager启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
- 将结果写入到HDFS当中
4 为什么hadoop适合小文件
- 在执行MapReduce之前,原始数据被切割成若干个split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
- FileInputFormat只划分比HDFS Block大的文件,所以FileInputFormat划分的结果就是这个文件或者是这个文件中的一部分。
- 如果一个文件的大小比Block小,将不会被划分,这也就是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
- 当Hadoop处理很多小文件(文件大小小于HDFS Block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率低下。
例如:一个G的文件,会被划分成8个128MB的split,并分配8个map任务处理,而10000个100KB的文件会被10000个map任务处理。