-
概念
A stage is a set of parallel tasks ① all computing the same function that need to run as part of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the DAGScheduler runs these stages in topological order.
Each Stage can ② either be a shuffle map stage, in which case its tasks' results are input for other stage(s), or a result stage, in which case its tasks directly compute a Spark action (e.g. count(), save(), etc) by running a function on an RDD. ③For shuffle map stages, we also track the nodes that each output partition is on.
Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO scheduling is used, this ④ allows Stages from earlier jobs to be computed first or recoveredfaster on failure.Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In thatcase, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.The latest one will be accessible through latestInfo.
-
代码解读
- [DAGScheduler]->private[scheduler] def handleJobSubmitted
{
var finalStage: ResultStage = null
try {
/**
②stage 的类型只有两种,一种是shuffle map stage 另一种是result
stage,并且result stage 一定是调用action操作的RDD所在
的stage,参数含义:func-对每个分区进行的操作根据action的不同
而不同,例如action为count的时候那么func就是计算每个分区的大小,
最终结果由jobwaiter(在SubmitJob方法中有涉及)搜集并计算将func
的结果进行相加返回。
**/
finalStage = newResultStage(finalRDD, func, partitions, jobId,
callSite)
} catch {
case e: Exception => logWarning("Creating new stage failed due to
exception - job: " + jobId, e) listener.jobFailed(e) return
}
. . .
/**
[1]首次提交的一定是finalStage即resultStage,然后会递归
寻找该Stage的依赖直到找到一个没有依赖的Stage才会生
成taskSet进行提交
submitStage(finalStage)
[2]在递归寻找依赖stage的过程中如果发现当前stage有依
赖则将当前stage放入等待队列中以便后续调度
**/
submitWaitingStages()
}
- [DAGScheduler]->private def submitStage(stage: Stage)
{
...
//[1]
val missing = **getMissingParentStages(stage)**.sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents”)
//[1]
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
//[2]
waitingStages += stage
}
...
}
- [DAGScheduler]->getMissingParentStages(stage: Stage): List[Stage]
根据dependency是否是shuffle dependency(wild or narrow)来进行stage划分
{
. . .
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
[2]
waitingForVisit.push(narrowDep.rdd)
}
}
. . .
}
ShuffleMapStage
-
概念
ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.⑤They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter). When executed, ⑥they save map output files that can later be fetched by reduce tasks. ⑦The
shuffleDep
field describes the shuffle each stage is part of,and ⑧variables likeoutputLocs
andnumAvailableOutputs
track how many map outputs are ready.ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage. For such stages, the ActiveJobs that submitted them are tracked inmapStageJobs
. ⑨Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage.
-
代码解读
⑤-在对stage进行划分时,shuffle map stage 包含前个shuffle之后的所有非shuffle操作,如map、filter等。
⑥ 对每个partition的output 信息进行维护
/**
List of [[MapStatus]] for each partition. The index of the array
is the map partition id,and each value in the array is the list of
possible [[MapStatus]] for a partition(a single task might run
multiple times).
③⑧当前rdd的位置及状态信息及每个partiton会在哪个executor
上执行并产生输出。该信息将用于DAG对task的调度.
**/
private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
[DAGScheduler]->submitMissingTasks(stage: Stage, jobId: Int)
...
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
...
⑦shuffleDep定义了整个shuffle的信息,每个stage的shuffleDep变量则标识该stage属于哪个shuffle应该执行怎么样的操作,
在提交执行stage时需要用到该信息。
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false){
...
}