1. 概述
SortShuffleManager
是Spark默认的混合shuffle之一, 是spark env的一部分,
在生成结果的parition超过200时会使用, 相对于HashShuffle来说, 避免了生成太多中间文件, 相对于钨丝计划来说更加稳定,经过了生产环境的检验。
截止到2.4.0版本, 钨丝计划里的BUG表还是没有清空, 相关的坑可以在Apache Jira上查到。
确保
spark.shuffle.spill
是enable的装填, 否则会报一个warning
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.
2. 写过程涉及的策略
在Sort Based Shuffle中, 各种map过程生成的records会按照target parition排序, 然后被连续的刷到一个文件中. 如果map task的结果过大, 会下刷到磁盘上, 然后这些磁盘上的文件merge到一起, 生成最终的输出文件.
SortShuffleManager
有两中写方法, 来管理shuffle文件的写过程
2.1 Serialized Sorting Model
满足以下三个条件时使用这种序列化的写方法
- Shuffle Dependency不涉及aggragation和排序过程
- 生成结果的parition, 或者说key小于 16777216
- serialized value是支持重排(relocation)的, 社区推荐的KryoSerializer, 以及Twitter对它的加强版都支持这个特性
在这个模式下, map端生成的record在到达ShuffleWriter后会被立即序列化(KryoSerializer), 放进缓存等待后续按key sort的过程(所以序列化组件要支持relocation)
作为钨丝计划 SPARK-4550
的一部分, 这种方法带来几种好处
- 它使用一个特殊的sort方式
ShuffleExternalSorter
来对cache(缓存)进行sort, 这里需要注意的是cache中放的是指向数据的压缩后的指针, 这样每个指针只需要8个bytes, 就可以在cache里放更多的数据 - 在对spill到硬盘的文件merge的过程中不需要deserialize, 因为它们都已经被放到了逻辑上的一个block里
- 如果开启了compress特性, 并且使用的compress算法支持直接后缀(concatenation), 那么合并过程就是简单的执行一个concatenation就可以了, 不需要解压, 也不需要内存内复制, 非常类似NIO API中的
transferTo
方法 (这个方法看Kafka源码的时候可以看到, 专门用来复制大文件)
2.2 Normal Model
对于不满足三个条件的, 就会走普通的方法. 普通的写方法是非常传统的, 把数据进行排序, 写入. 如果开启了压缩特性, 那么在merge时可能涉及到一部分解压后合并再压缩操作.
规避这些overhead的一个方法就是尽可能用KryoSerializer, 社区认为这是优化Spark Performance最重要的一步
3. 内部方法和结构
3.1 内部结构
/**
* A mapping from shuffle ids to the number of mappers producing output for those shuffles.
从shuffleId到这个shuffle涉及到的map数的映射表
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
// 封装用于shuffle过程中数据块相关的方法
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
3.2 registerShuffle
向ShuffleManager注册一个shuffle过程
/**
* 注册一个shuffle过程到shufflemanager, 并生成一个句柄, 交付到后续的task使用
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
/**
如果生成文件的parition数小于 spark.shuffle.sort.bypassMergeThreshold 配置项
并且我们不需要在Map端做aggragation的话
那么就直接写partition个文件, 然后把它们cat到一起.
这个操作避免了反复的序列化和反序列化, 属于Normal Mode的一种
这个操作没有进行merge过程, 所以叫byPassMerge
*/
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// 如果满足上面提到的三个条件, 会走Serialize Model
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// 不满足就走Normal Model
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
3.3 getReader
在执行reduce端的executor上回执行这个操作以获得读取shuffle结果的句柄对象, 然后后续把需要的数据拉取到本地.
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
3.4 getWriter
根据前面不同的shuffle策略, 这里使用封装好的方法来具体执行
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
3.5 unregisterShuffle
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
(0 until numMaps).foreach { mapId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}