DAGScheduler
DAGScheduler 的主要工作包括:创建 Job、划分 Stage、最后将 Stage 封装成 TaskSet 提交提交给 TaskScheduler 去处理这三个阶段。
DAGScheduler 在 Spark-Core 的 org.apache.spark.scheduler 包下。
实例化
DAGScheduler 是在 SparkContext 中进行的实例化,在 SparkContext 概览中提到过:
_dagScheduler = new DAGScheduler(this)
我们再了解下 DAGScheduler 中的一个比较重要的成员变量及其启动,后面会用到:
// 事件处理队列
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
// 这条语句在 DAGScheduler 的最后面
// 后文会用到
eventProcessLoop.start()
Stage 划分
在 Spark 中有 ShuffleMapStage 和 ResultStage 两种 Stage,ShuffleMapStage 为中间阶段的 Stage,ResultStage 结果应用的 Stage。
在 Spark 程序中,都是遇到 Action 操作时才会提交任务。所以,我们需要在 RDD 中找个 Action 算子作为切入点, 我这里以 RDD.collect() 方法为例:
// RDD.collect()
def collect(): Array[T] = withScope {
// 调用 SparkContext.runJob() 方法
// 注意:这里将 Final RDD 作为参数传递给了 SparkContext
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
SparkContext.runJob() 的实现细节:
// 通过层层调用,最终会调用这个 runJob() 方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
// 其它操作
// 这里调用了 DAGScheduler.runJob() 方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
// 其它操作
}
从任意 Action 操作入手都可以找到 DAGScheduler 的入口,接下来让我们看看 DAGScheduler.runJob() 方法的实现细节:
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
// 记录启动时间
val start = System.nanoTime
// 调用了自身的 submitJob() 方法
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
// 其它操作
}
DAGScheduler.submitJob() 的实现细节:
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 其它操作
// 将任务放入到事件队列中
// eventProcessLoop 内部的线程会在合适(有空)的时候进行处理
// 注意:发送的是 JobSubmitted 消息
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
eventProcessLoop.post() 的实现细节:
def post(event: E): Unit = {
eventQueue.put(event)
}
从 eventQueue 的名字可以看出,将 JobSubmitted 消息加入到了队列中。
接下来,我们需要进入到 eventProcessLoop (是 DAGSchedulerEventProcessLoop 的实例化对象) 中,看看它是如何处理 Job 的了。
DAGSchedulerEventProcessLoop 继承自 EventLoop,我们先看看 EventLoop 的成员变量:
// 事件队列
// eventProcessLoop.post() 就是向这个队列中添加的事件
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
// 线程
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
// 这是精简后的代码
while (!stopped.get) {
// 从事件队列中取出并处理
val event = eventQueue.take()
onReceive(event)
}
}
}
DAGSchedulerEventProcessLoop 继承自 EventLoop,在 DAGScheduler 的最后执行 eventProcessLoop.start() 方法,这个方法在 EventLoop 中也有实现:
def start(): Unit = {
onStart()
// 把线程启动了
eventThread.start()
}
线程启动后 eventThread 就会不断的从 eventQueue 中取出事件,然后交给 onReceive() 方法去处理,onReceive() 在 DAGSchedulerEventProcessLoop 中有实现:
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
doOnReceive(event) 的实现细节:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 处理提交过来的 Job
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
// ...
}
任务最终会交给 DAGScheduler.handleJobSubmitted() 来执行:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 通过 FinalRDD 来构建 ResultStage
// 通过 RDD 的依赖关系,找到父依赖(到 ShuffleRDD 为止)来创建 ResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
// ...
}
// 创建 Activer Job
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
// 加入到 Activer Job 集合中
activeJobs += job
finalStage.setActiveJob(job)
// 提交 FInal Stage
submitStage(finalStage)
}
createResultStage() 根据 Final RDD 可以推断出 Result Stage。
接下来,让我们看看 DAGScheduler 是如何推断缺失的 Stage 的,DAGScheduler.submitStage() 的实现细节:
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 通过当前的 stage 去推断前面丢失的 stages
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
// 如果当前 stage 无依赖的 stage
// 提交任务
submitMissingTasks(stage, jobId.get)
} else {
// 将最开始的 stage 先提交
// 考虑到 stage 多依赖的情况
for (parent <- missing) {
// 这里相当于有个递归调用
// 将最开始的 stage 先提交
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
DAGScheduler 通过 ResultStage 来向前推断父 Stage ,接下来我们看看 DAGScheduler.getMissingParentStages() 是如何进行推断的:
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// 等待被遍历的 RDD 栈
val waitingForVisit = new Stack[RDD[_]]
// 这里定义了一个内部方法,下面的 while 语句循环调用这个方法
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 遍历当前 RDD 的依赖列表
for (dep <- rdd.dependencies) {
dep match {
// 如果是宽依赖
case shufDep: ShuffleDependency[_, _, _] =>
// 创建 ShuffleMapStage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
// 将这个 Stage 添加到 Missing stages 中
missing += mapStage
}
// 如果是窄依赖
case narrowDep: NarrowDependency[_] =>
// 将这个依赖的 RDD 添加到等待遍历的集合
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
// 遍历等待被遍历的 RDD 栈
while (waitingForVisit.nonEmpty) {
// 对待遍历的 RDD 进行 visit(),现在看看 visit() 内部方法做了哪些操作
visit(waitingForVisit.pop())
}
missing.toList
}
DAGScheduler 通过判断 RDD 是否为宽依赖作为 Stage 的划分标准,进而将一个任务划分成一个或多个 Stage。
最后我们 DAGScheduler.submitMissingTasks() 是如何将 Stage 提交的:
private def submitMissingTasks(stage: Stage, jobId: Int) {
// 根据 Stage 的不同,选择不同的启动方式
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
// ...
// 根据 Stage 的不同,创建不同的 tasks
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
// 创建成 ShuffleMapTask
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
// 创建成 ResultTask
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
// 略略略..
}
// 其它操作
if (tasks.size > 0) {
// 略略略..
// 这里调用了 TaskScheduler.submitTasks()
// 将 TaskSet 交给 TaskScheduler 去执行
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
} else {
// 略略略..
}
}
简单的总结一下,DAGScheduler 对提交过来的任务进行了 Stage 的划分,并对每个 Stage 创建一个 TaskSet,最后通过 TaskScheduler.submitTasks() 方法,将各个TaskSet 交给 TaskScheduler 去处理。