定义
RDD是Spark中最基本的抽象,官方的定义如下:
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.
通常,一个RDD由以下几部分定义:
- compute函数,提供在计算过程中Partition元素的获取与计算方式;
- 一个partition的列表,每一个partition代表一个并行的最小划分单元;
- dependencies列表,即这个RDD依赖哪些父的RDD生成;
- 一个partition的位置列表,定义如何最快速的获取partition的位置,加快计算,这个是可选的,可作为本地化计算的优化选项;
- 如果一个RDD是key-value类型的,则可以有一个partitioner方法,定义如何对这个RDD进行分区。
在RDD中,通过以下方法对上述组成进行了定义:
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
以上五个方法分别对应上述五个组成部分,每一个具体的RDD都要重写上述方法。
在RDD上的所有计算都依赖于以上抽象的定义,换句话说,我们可以根据实际情况自由定义RDD,并利用Spark统一的任务调度引擎和执行引擎完成并行计算。
那么RDD可以进行哪些计算呢?归根结底,RDD在逻辑上也是一个集合,因此我们可以在RDD上执行一系列的集合操作, 包括map, filter, reduce,sort等等,与普通的集合不同的是,它可以物理上在多个机器上实现并行计算,只是具体的实现对用户不可见而已。
本文接下来先从代码层面对RDD的组成部分进行解读,然后以一个简单的例子看看RDD到底是怎么运作的。
RDD 的Partition
Partition是RDD并行的划分单元,其在Spark中的抽象定义十分简单:
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
它定义了一个index用于唯一表示这个partition,它更像一个指针指向实体数据,Partition的具体实现有很多,包括HadoopPartition, JdbcPartition, ParallelCollectionPartition等。
Compute
该方法的入参是一个partition以及TaskContext,TaskContext定义了计算上下文环境。这个方法定义了以何种形式处理这个partition中的元素.
Dependency
在Spark中,RDD有两种途径生成:
- 通过数据源加载;
- 通过其他RDD转换而来,转换后的RDD这里称之为child RDD,而转换的RDD称为parent RDD;
Dependency定义了child RDD的partition对parent RDD的依赖关系。Spark将这种依赖分成两种:
- Narrow Dependency, 即child RDD的partition仅仅依赖一小部分parent RDD的partition;
- Wide Dependency or Shuffle Dependency,即child RDD的partition依赖parent RDD的所有partition。
这两者的区别可通过下图直观的看出,这样划分的原因是Narrow Dependency的partition可以进行流水线(pipelined execution)的处理,而Wide Dependency必须经过shuffle,等待所有依赖的parent RDD的partition计算就绪后才能执行。Dependency也是Spark划分Task的依据。
Narrow Dependency
Narrow Dependency的源码定义如下:
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
定义了通过parent RDD的partition找到child RDD对应partition的抽象方法。
Spark中还有两个对Narrow Dependency的具体实现:OneToOneDependency和RangeDependency:
- OneToOneDependency中,child RDD的partition和parent RDD的partition是一一对应的关系。
- RangeDependency中,一个child RDD的partition对应一个区间范围内的parent RDD的partition
Wide Dependency
Wide Dependency的定义稍显复杂,它必须包含从parent RDD的partition转换成child RDD的partition的所有信息,包括parent RDD进行shuffle的partition方式(shuffle我们会在后续章节深入讨论),序列化方式,结合器等,总之通过这些信息足够完成parent RDD到child RDD的转换计算。
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
一个简单的例子
Partition、Dependency、Compute是定义RDD必要的组成部分,前面对这三者进行了比较抽象的介绍,下面以ParallelCollectionRDD为例具体的介绍RDD是怎么工作的。
下面这个例子是求1到10的平方和,首先构造一个Spark Context对象,然后通过SparkContext的parallelize方法生成ParallelCollectionRDD,在ParallelCollectionRDD上执行了map操作,计算数组中每一个元素的平方,然后执行reduce操作将元素的平方相加。
val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val seq1 = (1 to 10)
val result = sc.parallelize(seq1).map(num => num * num).reduce((num1, num2) => num1 + num2)
生成ParallelCollectionRDD
ParallelCollectionRDD是以集合作为数据源转换而来的RDD,它在Spark Context中可以通过parallelize方法生成:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
该方法以一个可变的Seq对象以及并行数作为参数构造ParallelCollectionRDD,且看ParallelCollectionRDD的定义:
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
}
}
其中getPartitions方法将传入的集合data等分成了numSlices份,每一份作为一个Partition。
compute定义了每一个partition的元素的获取方式,以Iterator的方式迭代获取即可。
getPreferredLocations是可选的,根据传入的locationPrefs来获取数据的最优位置,这里暂时没有用到。
执行map转换
map方法直接调用RDD抽象类中的方法:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
- 第一步是调用clean方法,该方法确保传入的f这个closure是可以被序列化并作为Task发送的;
- 第二步是将RDD转换成了一个MapPartitionsRDD;
MapParttionsRDD的源码如下:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
由代码可以看出,该RDD与其parent RDD共用partition与partitioner,在其构造函数中初始化了超类的RDD,超类RDD的构造函数如下:
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
可见,转换后的MapPartitionsRDD与最初的ParallelCollectionRDD是OneToOneDependency的关系。
MapPartitionsRDD的compute方法在每一个传入的partition上执行f函数:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
这里的f在本文的例子中就是计算每一个partition中元素的平方。
执行reduce
RDD中reduce方法的源码如下:
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
可以看出主要干了这么几件事:
- 传入的f定义了如何对RDD的元素进行计算,本文的例子中是将元素累加,通过clean函数对f进行了包装;
- 定义方法变量reducePartition,该方法变量中的方法定义了对每一个Partition的具体操作,即对每一个Partition执行reduceLeft;
- 定义方法变量mergeResult,该方法变量中的方法定义了对Partition的计算结果急性合并的操作,即对每一个Partition的计算结果taskResult执行f函数。这里有点像归并计算,reducePartition对Partition中的元素进行归并,mergeResult对每个Partition的归并结果进行归并,f定义了归并方式;
- 通过执行SparkContext 的runJob提交执行任务;
- 返回计算结果。
提交Job
在reduce方法中,最重要的一个步骤是将任务提交执行,下面具体看看这一块的源码:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
在该方法中最重要的语句是:
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
通过DAGScheduler提交Job,该方法也是所有执行作业提交的主入口。DAGScheduler负责根据RDD的Dependency 关系将Job划分称多个Stage,Stage的执行流程是一个有向无环图,每一个Stage中包含多个Task,这些Task由TaskScheduler提交到后端的执行引擎进行计算,并返回计算结果,再由resultHandler进一步处理后返回。
关于DAGScheduler如何根据Dependency生成Stage将在DAGScheduler的源码解读章节详细讲解。
TaskScheduler如何提交Task将在TaskScheduler详细讲解。
总之,RDD最基本的组成部分是compute,partition和dependencies,compute定义了每个RDD中partition的计算方式, partition是RDD分布式的体现,也是RDD并行计算的粒度,dependencies决定了RDD之间的依赖关系,是并行任务划分的依据。在某些场合中,可以通过getPreferredLocations和partitioner来对RDD的计算进行优化,以减少计算过程中的网络传输。