MapRduce是hadoop中的一个分布式计算工具,分为map阶段和reduce阶段其采用了一个分而治之的思想
以下一个例子作为演示,假设有一个涉及300M的文件(1.txt200m 2.txt 100m)
进行计算,求每个单词所占的个数
Map阶段
1.首先进行逻辑切片,切片个数就是maptask启动的个数
2.maptask通过textinputformat按行读取分区当中的数据,结果是一个键值对<k1,v1> (k1是偏移量地址,value是具体的数据)
3,textinputformat将读取的键值对结果传个业务代码进行处理,处理的结果是一个新的键值对<k2,v2>
4,处理结果经过partition分区(默认只要一个)传送给缓冲区
5,缓冲区(默认大小100M)内的数据达到百分之八十,会产生一次数据溢出.溢出的数据经过排序存储到磁盘当中的一个临时文件
6,当区中数据处理完成后,会把所有的临时文件merge合并到一个最终结果文件
reduce阶段
1,reducetask拉取(copy)maptask最终的结果集
2,合并:把拉取的数据Merge合并成为一个文件
3,排序:按照对应key值进行排序
4,分组:key一样的分为一组
5,每个组都调用业务代码处理一次,处理完的数据是一个新的键值对
6,处理结果通过TextOutputformat 存储到磁盘指定位置
思考:maptask 个数与文件大小,文件个数,逻辑分区大小有关,分区大小默认为hdfs分块大小,可以改
shuffle过程
shuffle:指的是map产生输出到reduce取得数据输入之前的过程
map阶段:
collect收集:把数据写入缓冲区阶段写入的是partition分区信息和key/value
partition分区: 跟reducetask的个数有关,reduce task又跟业务需要有关可以自定义
spill 溢出:当缓存区的数据达到80%会溢出,溢出的数据从内存中写入磁盘,如果配置了combiner规 约会按照分区和规约进行排序
merge 合并:把磁盘当中的这些临时文件合并成一个总的结果文件
reduce阶段:
copy拉取数据
merge 合并
sort 排序
geouping 分组
拓展- partition分区的个数影响因素
默认情况下分区只有一个,reducetask只有一个意味着最终处理结果输出在一个文件上,此时没有分区的概念.
用户也可以根据需求去设置MR程序中reducetask的个数 job.setNumarenducetask(n>2)
此时对于maptask来说,处理完的数据到底要去哪里,这个问题就是partition分区问题
分区的默认规则:
哈希取余<key,value>
key.hash % reducetask 个数 -->余数(分区编号)
默认规则不能保证数据平均分配也无法保证是否符合业务需求,但是能保证key一样 分区一样
自定义规则:
根据具体需求定义
在Map任务和Reduce任务的过程中,一共发生了3次排序
1)当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序
2)在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并文件只需要再做一次排序即可使输出文件整体有序。
3)在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序,所以合并文件时只需再做一次排序即可使输出文件整体有序
在这3次排序中第一次是内存缓冲区做的内排序,使用的算法使快速排序,第二次排序和第三次排序都是在文件合并阶段发生的,使用的是归并排序。