目前,spark shuffle write有三种方法:hash shuffle、sort shuffle和tungsten-sort shuffle。从1.2版本开始默认为sort shuffle, 之前采用hash shuffle。在1.4版本以后可以通过(spark.shuffle.manager = tungsten-sort)开启tungsten-sort shuffle。
Hash shuffle
Hash shuffle 经历了有两个阶段,前一阶段的过程如图:
- 每一个Mapper创建出和Reducer数目相同的bucket,bucket实际上是一个buffer,其大小为spark.shuffle.file.buffer.kb(默认32KB)。
- Mapper产生的结果会根据设置的partition算法填充到每个bucket中去,然后再写入到磁盘文件。
- Reducer从远端或是本地的block manager中找到相应的文件读取数据。
这一阶段的问题:
- 当Mapper数量和Reducer数量比较大时,产生输出大量文件(M * R),这对文件系统是一个非常大的负担。同时在shuffle数据量不大而shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。
- 缓存空间占用比较大,一个 worker node 上同时存在的 bucket 个数可以达到 cores*R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask)。
第二阶段改善了第一阶段出现的中间shuffle文件数量多的问题。
与上一阶段相比,在同一个 core 上连续运行的 ShuffleMapTasks 共用一个输出文件,这样产生shuffle文件的数量是cores*R,比上一阶段减少。
Sort shuffle
与hash shuffle相比,sort shuffle中每个Mapper只产生一个数据文件和一个索引文件,数据文件中的数据按照Reducer排序,但属于同一个Reducer的数据不排序。Mapper产生的数据先放到AppendOnlyMap这个数据结构中,如果内存不够,数据则会spill到磁盘,最后合并成一个文件。
与Hash shuffle相比,shuffle文件数量减少,内存使用更加可控。但排序会影响速度。In case you use SSD drives for the temporary data of Spark shuffles, hash shuffle might work better for you(来自于https://0x0fff.com/spark-architecture-shuffle/)。
Tungsten-sort shuffle
与Sort shuffle相比,Tungsten最大不同在于内存管理机制。Tungsten采用独特的内存模型来存储数据,而Sort shuffle采用Java的数据结构AppendOnlyMap来存储数据,并且存储的数据是序列化的。这种独特的内存模型叫做page。序列化后的数据放在page中,当page满后,spill到磁盘文件,然后从新allocate一个新的page(如果spark.unsafe.offHeap=true,会从off-heap分配内存,否则,从in-heap分配内存)。最后将page里数据和spilled磁盘文件merge到一个文件里。注意merge的时候不需要反序列化(sort shuffle需要)。
为了数据record在page中寻址,定义了PackedRecordPointer对象用一个64bit的long型变量来记录如下信息:
[24 bit partition number][13 bit memory page number][27 bit offset in page]。
注意这些信息是用来将数据按照partition进行排序。从这些信息中,我们得到如下的约束。
一是partition 的数量(Reducer的数目)最多为2^24=16777216。
二是单条记录不能大于 2^27=128 MB,加上page数目限制,一个task 能管理到的内存最多是 2^13 * 128M 也就是1TB左右。
数据是序列化后放在内存,所以占据的内存空间小,减少了spill的次数。sort是在序列化的数据上进行,效率更高。merge时不需要反序列数据。