本篇文章是总结官方文档给出的MapReduce编程模型
Input and Output types of a MapReduce job:
(input)<k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3>(output)
MapReduce 讲解
新的MapReduce使用 mapreduce包下的类进行mapreduce job的编写
-
Mapper
应用通过使用 Counter来报告统计数据与给定输出键相关的所有中间值由框架分组,并传递到 Reducer来确定最终的输出。用户可以通过指定Comparator 来控制分组,
设置如下: job.setGroupingComparatorClass(Class)Mapper的输出经过排好序后分区到每一个Reducer。总共的分区数量是与reducer的个数是相同的。用户可以控制哪一个key去哪一个Reducer通过实现一个通用的 Partitioner
用户可以选择一个特定的 combiner,通过如下设置: Job.setCombinerClass(Class), 通过执行中间输出的本地聚合,将会有效的降低从Mapper到Reducer的数据输出。
这些中间值,输出排序总是被简单的格式化。程序可以控制,这些中间值的输出可以被 compressed 并且这个编码格式可以在Configuration中控制。
Maps数量的控制,基本上一个hdfs的block分配一个mapper, 但是可以控制Mapper的数量,通过如下配置: Configuration.set(MRJobConfig.NUM_MAPS, int)
2 Reducer
在Job中设置 Reducer的实现类,通过如下设置:Job.setReducerClass(Class)
设置Reducer的数量,通过如下设置: Job.setNumReduceTasks(int)
Reducer有三个主要的阶段: shuffle, sort 和 reduce
shuffle
reducer的输入是mapper中排好序的输出,在这个阶段,框架抓取所有mapper的输出的相关分区,通过HTTP
sort
在此阶段,框架将key进行分组(不同的mapper可能输出相同的key)
shuffle和sort同时发生,当mapper输出被获取时,他们被合并为 key, list<>
reduce
在这个阶段,调用reduce方法。 典型的会输出到文件系统,通过 context.write(WritableComparable, Writable).
应用将会使用 Counter(计数器) 进行统计
输出的数据是不排序的。
可以将 Reducer的数量设置为0
在这种情况下,mapper的task将会直接将结果写入到文件系统中。
Partitioner
Partitioner 控制这些key(map-outputs)是如何进行分区的, 一般来说,会对key调用hash函数来进行分区。。分区的数量是和reduce的数量是相同的。
HashPartitioner是默认的分区类
Job Configuration
Job 代表了一个 MapReduce任务的配置
Job 一般用来配置 Mapper类,combiner,Partitioner, Reducer, InputFormat, OutputFormat.
用户也可以使用 Configuration.set(String, String)/Configuration.get(String)来设置/获取属性参数在应用中需要用的到的。
当有大量的数据需要设置/获取时,通过DistributeCache来进行设置大量的只读数据。
JOb Input
InputFormat 描述了输入的规范在一个Mapreduce Job中
TextInputFormat是默认的InputFormat
InputSplit
InputSplit 表示这个一个mapper的被处理的数据。
RecordReader
RecordReader 用来在InputSplit中读<key, value>
Job Output
OutputFormat 描述了在一个Mapreduce中输出的规范
TextOutputFormat是默认的OutputFormat
OutputCommitter
OutputCommitter 描述了在MapReduce 任务中如何提交 task的输出
RecordWriter
RecordWrite将输出<key, value> 写入到输出文件中