What is Dependency in Spark?
依赖(Dependency)用于表示子 RDD 的一个分区与父 RDD 中的哪些分区相关。
Type of Dependencies
Narrow Dependency
Each partition of the parent RDD is used by at most one partition of the child RDD.
窄依赖(Narrow Dependency)指父 RDD 的一个分区至多被一个子 RDD 的分区使用到。
Shuffle Dependency
混合依赖(Shuffle Dependency)指父 RDD 中的一个分区被子 RDD 中多个分区使用到。
Code Snippets
Dependency
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
NarrowDependency
Each partition of the child RDD depends on a small number of partitions of the parent RDD.
由于窄依赖中每个子 RDD 的分区只能依赖于父 RDD 中的一部分分区(具体是哪些分区取决于所采用的转换操作-Transformation)。因而在窄依赖的类定义中要求任何具体子类都必须实现 getParents(partitionId)
方法,指示子 RDD 的该分区依赖于父 RDD 中的哪些分区。
/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
ShuffleDependency
ShuffleDependency 类用于表征对一个 shuffle stage 输出结果的依赖。
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
ShuffleDependency 中几个关键参数的说明如下:
- _rdd: 父 RDD
- partitioner: shuffle write 端采用的分区方法
- serializer: 序列化方法
- keyOrdering: key ordering for RDD's shuffles ???
- aggregator: shuffle read 端的聚合器
- mapSideCombine: 是否实行 map 端合并