前言
相对来说,MapReduce是一个款比较 “古老” 的大数据离线计算框架,但该框架对批量数据离线计算的思想仍值得借鉴!
在处理过程中需要把mapper阶段的数据传递给reducer阶段,这个过程可以广义地称为Shuffle,是 MapReduce 框架中最关键的一个流程。
采用图解的方式进行表达可以降低理解难度
Shuffle
使用自顶向下的方式进行理解Shuffle流程。
过程总览
Shuffle流程横跨了mapper阶段和reducer阶段,在mapper阶段包括Spill过程,在reducer阶段包括Copy过程和Sort过程,如图所示:
mapper阶段的Spill
这个过程包括输出(collect)、排序(sort)、溢写(spill)、合并(merge)collect
Map任务不断地以<k,v>对的形式把数据输出到一个存在于内存中的环形数据结构中。使用环形数据结构是为了更有效地使用内存空间。这个数据结构其实就是个字节数组,叫kvbuffer。这里不仅用来存放数据,还有了一些索引数据,放置索引数据的区域叫kvmeta。数据区域和索引数据区域在kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点是会动态变化的,每次溢写(spill)之后都会变化一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长。
sort
把kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。
spill
Spill线程为这次spill过程创建一个磁盘文件: 创建一个类似于“spill13.out”的文件。Spill线程根据排过序的kvmeta逐个把partition中的数据刷写到这个文件中,一个partition对应的数据刷写完之后顺序地刷写下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。
但问题来了
所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition的数据在这个文件中存放的起始位置呢?
答案:利用索引
有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度。一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:创建一个类似于“spill13.index”的文件,存储了索引数据,(不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill13.out文件也不一定在同一个目录下。)
每一次Spill过程就会最少生成一个 *.out文件,有时还会生成 *.index文件。
索引文件和数据文件的对应关系如下图所示:
在Spill线程进行SortAndSpill工作的同时,Map任务会继续进行数据的输出。Map还是把数据写到kvbuffer中,在两个指针即将重合时,在kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当溢写完成后,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:
变换方向,继续~
merge
Map任务如果输出数据量很大,可能会进行好几次溢写,out文件和Index文件会产生很多,分布在不同的磁盘上。最后是merge过程把这些文件合并。merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。
逐个partition进行合并输出。对于某个partition来说,从spillXX.index索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。
然后对这个partition对应的所有的段进行合并,目标是合并成一个segment列表。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。
最终的索引数据仍然输出到Index文件中。
Map端的Shuffle过程到此结束。
reducer阶段的Copy和Sort
copy
Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。
有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。
merge sort
这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的,迭代进行的。
Reduce端的Shuffle过程至此结束。