SortShuffleManager两种运行机制
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制;
普通机制:
bypass机制:
普通机制为默认使用,在1.2之前使用HashShuffleManager;使用bypass机制需要满足shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值并且mapSideCombine(受聚合算子影响)不为true
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}
// 创建shuffleHandler
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
//帮助器方法,用于确定洗牌是否应使用优化的序列化洗牌路径,或者是否应该退回到对反序列化对象操作的原始路径。
// 同时满足序列化器支持其序列化对象的重定位,未定义聚合器 aggregator ,numPartitions小于2^24;才能使用SerializedShuffleHandle
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
spark.shuffle.sort.bypassMergeThreshold带来的影响
从上述过程来看,调整spark.shuffle.sort.bypassMergeThreshold的值,使其大于等于分区数,能有效的减少部分Shuffler算子的排序过程;同时能有效的减少rdd的复制情况,源码如下
private def needToCopyObjectsBeforeShuffle(
partitioner: Partitioner,
serializer: Serializer): Boolean = {
val conf = SparkEnv.get.conf
val shuffleManager = SparkEnv.get.shuffleManager
val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
false
} else if (serializer.supportsRelocationOfSerializedObjects) {
// 满足序列化器支持其序列化对象的重定位
false
} else {
true
}
} else {
true
}
}
val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
if (needToCopyObjectsBeforeShuffle(part, serializer)) {
rdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
}
} else {
rdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor()
val mutablePair = new MutablePair[Int, InternalRow]()
iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
}
}
}
上述源码中如果序列化器不支持对象的重定位,则可能会产生数据复制;目前spark集成的JavaSerializer是不支持重定位的,但KryoSerializer支持重定位