


A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.


  • compute函数,提供在计算过程中Partition元素的获取与计算方式;
  • 一个partition的列表,每一个partition代表一个并行的最小划分单元;
  • dependencies列表,即这个RDD依赖哪些父的RDD生成;
  • 一个partition的位置列表,定义如何最快速的获取partition的位置,加快计算,这个是可选的,可作为本地化计算的优化选项;
  • 如果一个RDD是key-value类型的,则可以有一个partitioner方法,定义如何对这个RDD进行分区。


   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition.
  def compute(split: Partition, context: TaskContext): Iterator[T]

   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
  protected def getPartitions: Array[Partition]

   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
  protected def getDependencies: Seq[Dependency[_]] = deps

   * Optionally overridden by subclasses to specify placement preferences.
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  @transient val partitioner: Option[Partitioner] = None



那么RDD可以进行哪些计算呢?归根结底,RDD在逻辑上也是一个集合,因此我们可以在RDD上执行一系列的集合操作, 包括map, filter, reduce,sort等等,与普通的集合不同的是,它可以物理上在多个机器上实现并行计算,只是具体的实现对用户不可见而已。


RDD 的Partition


trait Partition extends Serializable {
   * Get the partition's index within its parent RDD
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)

它定义了一个index用于唯一表示这个partition,它更像一个指针指向实体数据,Partition的具体实现有很多,包括HadoopPartition, JdbcPartition, ParallelCollectionPartition等。





  • 通过数据源加载;
  • 通过其他RDD转换而来,转换后的RDD这里称之为child RDD,而转换的RDD称为parent RDD;

Dependency定义了child RDD的partition对parent RDD的依赖关系。Spark将这种依赖分成两种:

  • Narrow Dependency, 即child RDD的partition仅仅依赖一小部分parent RDD的partition;
  • Wide Dependency or Shuffle Dependency,即child RDD的partition依赖parent RDD的所有partition。

这两者的区别可通过下图直观的看出,这样划分的原因是Narrow Dependency的partition可以进行流水线(pipelined execution)的处理,而Wide Dependency必须经过shuffle,等待所有依赖的parent RDD的partition计算就绪后才能执行。Dependency也是Spark划分Task的依据。

Narrow Dependency and Wide Dependency

Narrow Dependency

Narrow Dependency的源码定义如下:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd

定义了通过parent RDD的partition找到child RDD对应partition的抽象方法。

Spark中还有两个对Narrow Dependency的具体实现:OneToOneDependency和RangeDependency:

  • OneToOneDependency中,child RDD的partition和parent RDD的partition是一一对应的关系。
  • RangeDependency中,一个child RDD的partition对应一个区间范围内的parent RDD的partition

Wide Dependency

Wide Dependency的定义稍显复杂,它必须包含从parent RDD的partition转换成child RDD的partition的所有信息,包括parent RDD进行shuffle的partition方式(shuffle我们会在后续章节深入讨论),序列化方式,结合器等,总之通过这些信息足够完成parent RDD到child RDD的转换计算。

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)




下面这个例子是求1到10的平方和,首先构造一个Spark Context对象,然后通过SparkContext的parallelize方法生成ParallelCollectionRDD,在ParallelCollectionRDD上执行了map操作,计算数组中每一个元素的平方,然后执行reduce操作将元素的平方相加。

   val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
   val seq1 = (1 to 10)
   val result = sc.parallelize(seq1).map(num => num * num).reduce((num1, num2) => num1 + num2)

ParallelCollectionRDD是以集合作为数据源转换而来的RDD,它在Spark Context中可以通过parallelize方法生成:

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())


private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)

  override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)



   * Return a new RDD by applying a function to all elements of this RDD.
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  • 第一步是调用clean方法,该方法确保传入的f这个closure是可以被序列化并作为Task发送的;
  • 第二步是将RDD转换成了一个MapPartitionsRDD;


private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    prev = null

由代码可以看出,该RDD与其parent RDD共用partition与partitioner,在其构造函数中初始化了超类的RDD,超类RDD的构造函数如下:

  /** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))



override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))



  def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
      } else {
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))


  • 传入的f定义了如何对RDD的元素进行计算,本文的例子中是将元素累加,通过clean函数对f进行了包装;
  • 定义方法变量reducePartition,该方法变量中的方法定义了对每一个Partition的具体操作,即对每一个Partition执行reduceLeft;
  • 定义方法变量mergeResult,该方法变量中的方法定义了对Partition的计算结果急性合并的操作,即对每一个Partition的计算结果taskResult执行f函数。这里有点像归并计算,reducePartition对Partition中的元素进行归并,mergeResult对每个Partition的归并结果进行归并,f定义了归并方式;
  • 通过执行SparkContext 的runJob提交执行任务;
  • 返回计算结果。


  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)


dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

通过DAGScheduler提交Job,该方法也是所有执行作业提交的主入口。DAGScheduler负责根据RDD的Dependency 关系将Job划分称多个Stage,Stage的执行流程是一个有向无环图,每一个Stage中包含多个Task,这些Task由TaskScheduler提交到后端的执行引擎进行计算,并返回计算结果,再由resultHandler进一步处理后返回。


总之,RDD最基本的组成部分是compute,partition和dependencies,compute定义了每个RDD中partition的计算方式, partition是RDD分布式的体现,也是RDD并行计算的粒度,dependencies决定了RDD之间的依赖关系,是并行任务划分的依据。在某些场合中,可以通过getPreferredLocations和partitioner来对RDD的计算进行优化,以减少计算过程中的网络传输。

