注意:
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体过程详解如下:
1)准备一个文件,例如ss.txt大小为200M。
2)上节学到,客户算submit()前,获取待处理数据的信息,然后根据参数配置,形成一个个任务分配的规划。
3)客户端会将job.xml、xxx.jar、job.xml提交到Yarn。
4)Yarn开启Mrappmaster,读取job.xml文件,计算出MapTask数量,并开启对应的MapTask。
5)默认用TextInputFormat读取切片文件,RecorderReader按行读取文件。
6)InputFormat读取完数据,把数据交给自定义mapper,执行逻辑运算。
7)MapTask收集我们的map()方法输出的kv对,放到环形缓冲区中(大小默认100M)。
8)当缓冲区写到80%后,从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件。在环形缓冲区中,数据一开始写入到不同分区,分区内文件根据索引进行快排,使得分区内数据有序。如果剩余内存写速度大于溢写速度,那么写内存操作将等待。
9)环形缓冲区数据溢写到磁盘(分区且分区内有序)
10)多个溢出文件会被合并成大的溢出文件,将多个文件的同一个分区进行归并排序
11)可以在此 过程进行COmbiner合并
10)所有Maptask任务完成后,启动相应数量的ReduceTask,MrAppMaster并告知ReduceTask处理数据范围(数据分区)。这里并不是所有任务完成启动ReduceTask。
13)Reduce根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
14)将多个MapTask机器上结果分区数据进行合并并进行归并排序
15)将数据进行分组,数据向后查,如果后一个与当前key不一致,则当前key和之前数据为一组发送到reducer
16)reducer处理完后,默认通过TextOutputFormat写出结果数据。
小结:
Shuffle的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘IO次数越少,执行速度就越快
缓冲区的大小可以通过参数调筝,参数:mapreduce.task.io.sort.mb默认100M。
环形缓冲区80%后反向写,同时将远80%内存的文件写到磁盘。