1. MapReduce概述
MapReduce是一个分布式计算
的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写
的业务逻辑代码并发地运行在一个Hadoop集群上
。
1.1 MapReduce进程
一个完整的MapReduce程序在运行时有3种进程:
- MrAppMaster:负责整个MapReduce程序的调度和状态协调;
- MapTask:负责Map阶段的数据处理流程;
- ReduceTask:负责Reduce阶段的数据处理流程。
2. MapReduce编程
用户需要按照一定的规范来进行MapReduce程序的开发。
2.1编程规范
用户编写的MapReduce程序由三个部分组成:Mapper、Reducer和Driver.
Mapper编写
- 用户自定义的Mapper继承特定的Mapper类;
- 定义Mapper的输入数据类型(K-V对形式的两个参数,都需要指定);
- 定义Mapper的输出数据类型(K-V对形式的两个参数,都需要指定);
- 重写map()方法,将业务逻辑写在其中。
对于每个输入的<K,V>参数,map()方法均会执行一次。
Reduce编写
- 用户自定义Reducer继承特定的Reducer父类;
- 定义Reducer的输入数据类型,其也是K-V对,并对应Mapper的输出数据类型;
- 重写reduce()方法,将业务逻辑写入其中。
ReduceTask进程会对每一组相同k的<k,v>输入参数只调用一次reduce()方法。
Driver编写
相当于YARN集群的客户端,用于提交MapReduce程序到YARN集群,提交的是一个job对象,该job对象封装了MapReduce程序相关的运行参数。
2.1 常用数据类型
Hadoop的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储。
Hadoop类型 | Java类型 |
---|---|
BooleanWritable | boolean |
ByteWritable | byte |
IntWritable | int |
FloatWritable | float |
LongWritable | long |
DoubleWritable | double |
Text | String |
MapWritable | map |
ArrayWritable | array |
自定义MapReduce数据类型
自定义数据类型有两种方式:
- 实现Writable接口
重写 write()和readFields()方法 - 实现WritableComparable接口
重写 write(),readFields()和compareTo()方法。
3. MapReduce的优缺点
优点
-
MapReduce编程简单
。只需要实现一些接口,就可以完成一个分布式计算程序,并分不到大量廉价的PC机器上运行; -
良好的扩展性
。当计算资源不足时,可以通过简单的增加机器来扩展计算能力; -
高容错性
。MapReduce程序运行在廉价的PC机器上,当其中一台机器宕机了,它可以自动将计算任务转移到另外一个节点上运行,而不需要人工参与; -
PB级以上海量数据的计算
。
缺点
-
不擅长实时计算
。MapReduce一般用来做数据的离线处理,它没法像MySQL一样,在毫秒或者秒级别内返回结果; -
不擅长流式计算
。流式计算的输入数据是动态的,而MapReduce的输入数据是静态的,不能动态变化; -
不擅长DAG(有向图)计算
。DAG计算指的是,多个应用存在依赖关系,后一个应用的输入为前一个应用的输出。使用MapReduce进行此种计算,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常低下。
4. MapReduce工作流程
1)默认HDFS中一个存储块的大小是128M,所以需要将200M拆成128+72;
2)maptask并行运行,互不影响;
3)reducetask也是并行运行,互不影响,但是reducetask的开始要依赖于所以的maptask都运行结束。
5. WordCount案例
这里实现一个和Hadoop官方提供的wordcount类似功能,需求如下:
输入:文本文件,文件中包含一些单词。
输出:各个单词输出的次数。
对照“2.1编程规范”,我们需要编写Mapper、Reducer、Driver。
5.1 Mapper
public class WordcountMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
Text outKey = new Text();
IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
outKey.set(word);
context.write(outKey,outValue);
}
}
}
5.2 Reducer
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
5.3 Driver
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 设置jar加载路径
job.setJarByClass(WordcountDriver.class);
// 设置map和reduce类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 设置map输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交
job.submit();
}
}
需要注意一点:导包的时候很多类在org.apache.hadoop.mapreduce
包和org.apache.hadoop.mapred
中存在同名的情况,一般导入org.apache.hadoop.mapreduce
包。
5.4 打包运行
在pom.xml文件中配置maven-assembly-plugin
,然后通过mvn install
指令对应用进行打包, 最后在target目录中可以看到打好的包
其中with-dependencies.jar中多了依赖,如果是在集群中运行的话,集群中包含mapreduce运行所需的jar包,所以使用不带依赖的jar包即可。
将wordcount-1.0-SNAPSHOT.jar上传到hadoop服务器,然后运行hadoop jar
即可。
[hadoop@hadoop01 software]$ hadoop jar wordcount-1.0-SNAPSHOT.jar com.zgc.mapreduce.wordcount.WordcountDriver /usr/hadoop/input /usr/hadoop/output
其中,/usr/hadoop/input
是一个hdfs的目录,它下面含有需要统计单词次数的文件。
执行完成之后,可以通过hdfs dfs -cat /usr/hadoop/output/part-r-00000
指令查看统计结果。