在shuffle过程中会进行多次的溢出到磁盘的操作,那么条件是什么呢?这个方法maybeSpill就是判断是否进行溢出操作的。
1.不是每加载一个数据进行一次判断的,而是每隔32个数据判断一次。
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
2.如果当前的内存大于阈值;(当前内存大小指的是当前加载到内存中的数据的大小,即PartitionedAppendOnlyMap或者PartitionedPairBuffer的大小;阈值的初始大小为5M,或者通过spark.shuffle.spill.initialMemoryThreshold进行设置)。
private[this] val initialMemoryThreshold: Long =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)
3.向任务内存管理器申请内存,大小为当前内存的2倍减去阈值。
4.把申请到的内存加到阈值上。如果当前内存还是大于等于阈值,那么说明现在的内存不够用了,就需要进行溢出到磁盘里面。
5.或者当前读取的记录个数大于spark.shuffle.spill.numElementsForceSpillThreshold的值,也会进行溢出。spark.shuffle.spill.numElementsForceSpillThreshold的默认值是整型的最大值。