Spark内部有两大类操作,Transformation和Action;
Transformation又分窄依赖操作和宽依赖操作,区分这两种操作的很简单,RDD之间转化过程中没有shuffle的就是窄依赖,有shuffle的就是宽依赖:
- RDD中数据是分partition的,每个partition分布在特定节点,shuffle就是指在RDD在转化过程中,一个partition中的数据需要被split成多个分片,传入到下游RDD中的多个partition中去,比如reduceByKey这类的操作,实际生产中,下游partition中的数据往往依赖于上游多个partition的数据,这样就是会产生一个问题,如果下游某个partition中的数据缺失,需要重新计算上游多个partition的数据,而重新计算的这些上游partition中又会同时含有下游缺失的数据partition和正常的partition,这就会造成计算的冗余;
- 与之相对的是窄依赖计算,上游一个或多个partition只对应下游一个partition,所以下游某个节点故障后,某个partition缺失数据,上游需要计算的所有partition不含有冗余计算,比如map,filter,union等等;
Spark数据集是RDD,如果数据类型是Tuple2,还提供PariRDDFunctions的一些方法(是通过object RDD addToPariFuncitons隐式包含进来的):
- 如果是窄依赖,比如map操作,生成
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF));
- 如果是宽依赖,比如reduceByKey操作,生成
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine);
其中aggregator定义了数据merge的规则,这个merge包括在map端(类似hadoop中的combiner,也就是partition内部数据merge的规则)和reduce端(partition之间数据merge的规则),这个merge可以是多个value组成更大的集合,例如groupByKey,也可以是多个value合并计算出新的value,比如word count作业中的reduceByKey,根据业务逻辑而定; mapSideCombine是一个boolean,表示是否需要在map端进行merge操作,比如reduceByKey是true,groupByKey是false;
首先来看一下MapPartitionRDD:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
其中partitioner(默认HashPartitioner)定义如果存在shuffle,不同的key被shuffle到下游的那一个分片(partition)中去,对于MapPartitionsRDD,不存在这样的情况;每个RDD都会维护自己的dependencies,是一个Seq,这里的dependency可能是OneToOneDependency(一对一,例如map),可能是RangeDependency(例如union,两个RDD合并成一个,但是partition不会发生merge,上游RDD多个partition会变成下游RDD的partition range),也可能是ShuffleDependency;
firstParent这里就是dependencies中的第一个,map操作,上游只有一个RDD,也就是只有一个partition,compute操作很简单,通过split指定上游partition,对其执行f函数,返回的也是一个Iterator,其中firstParent[T].iterator,如果没有cache或checkpoint则也是一个compute实现;
上面的结果就是RDD内部层层套RDD,最后计算(compute)的时候,由里到外,不断的遍历iterator,完成计算,这里直观感觉上是遍历多次,但基于scala内部的实现,减少不必要的遍历;
下面再看ShuffledRDD:
compute内部通过shuffleManager获取上游shuffle-write产生的数据,根据split,返回iterator,并不包涵其他的函数计算:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]
}
下面来看一下调用rdd.collect(Spark的Action操作)之后stage的划分,最后直接的处理函数是handleJobSubmitted,
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite);
Stage这里面涉及到两种ResultStage和ShuffleMapStage,每一个Stage都包含若干parent stages,对于这种一对一的RDD DAG 作业,parent stages这个集合中,保存的都只是上游1个stage,是一个单链条;划分Stage的方法很简单,就是通过ShuffleDependency来判断;
stage都确定好之后,将stage转化为task进行计算,计算的条件就是一个stage的所有parent stges都已经计算完成,stage到task的逻辑是:submitMissingTasks;
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
首先得到需要计算哪些partition,然后根据ShuffleMapStage和ResultStage分别生成ShuffleMapTask和ResultTask,然后提交task:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, [stage.id](http://stage.id), stage.latestInfo.attemptId, jobId, properties))
CoarseGrainedExecutorBackend收到LaunchTask之后:
case LaunchTask(data) => if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
}
通过executor执行task。
补充说明一点,整个RDD的DAG图根据ShuffleDependency划分出若干stage之后,或者是一个ShuffleMapStage,或者是一个ResultStage,对于ResultStage很简单,从上游读取数据,聚合之后就可以返回了,对于ShuffleMapStage,他的目标就是完成Shuffle-Write操作,这个实在ShuffleMapTask中的runTask完成的,而需要读取的数据由上游的RDD的iterator提供,上游如果是普通的RDD,比如MapPartitionsRDD,直接读取,内部的compute函数完成两个事情,第一提供iterator,第二完成上游RDD的计算逻辑,即用函数对iterator操作,而这个提供数据iterator又回继续递归调用更上游的RDD的compute,但是如果是ShuffledRDD,则不会递归到更上游,而是去reader,读取上游数据,返回iterator,仅此而已,reader的实现中是到driver拿到mapstatuses,里面包括block的executor位置信息,然后连接executor进行读取。