DAGScheduler 源码浅析

DAGScheduler

DAGScheduler 的主要工作包括:创建 Job、划分 Stage、最后将 Stage 封装成 TaskSet 提交提交给 TaskScheduler 去处理这三个阶段。

image

DAGScheduler 在 Spark-Core 的 org.apache.spark.scheduler 包下。

实例化

DAGScheduler 是在 SparkContext 中进行的实例化,在 SparkContext 概览中提到过:

_dagScheduler = new DAGScheduler(this)

我们再了解下 DAGScheduler 中的一个比较重要的成员变量及其启动,后面会用到:

// 事件处理队列
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

// 这条语句在 DAGScheduler 的最后面
// 后文会用到
eventProcessLoop.start()

Stage 划分

在 Spark 中有 ShuffleMapStage 和 ResultStage 两种 Stage,ShuffleMapStage 为中间阶段的 Stage,ResultStage 结果应用的 Stage。

类关系

在 Spark 程序中,都是遇到 Action 操作时才会提交任务。所以,我们需要在 RDD 中找个 Action 算子作为切入点, 我这里以 RDD.collect() 方法为例:

// RDD.collect()
def collect(): Array[T] = withScope {
  // 调用 SparkContext.runJob() 方法
  // 注意:这里将 Final RDD 作为参数传递给了 SparkContext
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

SparkContext.runJob() 的实现细节:

// 通过层层调用,最终会调用这个 runJob() 方法
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {

  // 其它操作

  // 这里调用了 DAGScheduler.runJob() 方法
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  // 其它操作
}

从任意 Action 操作入手都可以找到 DAGScheduler 的入口,接下来让我们看看 DAGScheduler.runJob() 方法的实现细节:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {

  // 记录启动时间
  val start = System.nanoTime
  // 调用了自身的 submitJob() 方法
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

  // 其它操作
}

DAGScheduler.submitJob() 的实现细节:

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {

  // 其它操作

  // 将任务放入到事件队列中
  // eventProcessLoop 内部的线程会在合适(有空)的时候进行处理
  // 注意:发送的是 JobSubmitted 消息
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))

  waiter
}

eventProcessLoop.post() 的实现细节:

def post(event: E): Unit = {
  eventQueue.put(event)
}

从 eventQueue 的名字可以看出,将 JobSubmitted 消息加入到了队列中。

接下来,我们需要进入到 eventProcessLoop (是 DAGSchedulerEventProcessLoop 的实例化对象) 中,看看它是如何处理 Job 的了。

DAGSchedulerEventProcessLoop 继承自 EventLoop,我们先看看 EventLoop 的成员变量:

// 事件队列
// eventProcessLoop.post() 就是向这个队列中添加的事件
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

// 线程
private val eventThread = new Thread(name) {
  setDaemon(true)

  override def run(): Unit = {
    // 这是精简后的代码
    while (!stopped.get) {
      // 从事件队列中取出并处理
      val event = eventQueue.take()
      onReceive(event)
    }
    
  }

}

DAGSchedulerEventProcessLoop 继承自 EventLoop,在 DAGScheduler 的最后执行 eventProcessLoop.start() 方法,这个方法在 EventLoop 中也有实现:

def start(): Unit = {
  onStart()
  // 把线程启动了
  eventThread.start()
}

线程启动后 eventThread 就会不断的从 eventQueue 中取出事件,然后交给 onReceive() 方法去处理,onReceive() 在 DAGSchedulerEventProcessLoop 中有实现:

override def onReceive(event: DAGSchedulerEvent): Unit = {
  val timerContext = timer.time()
  try {
    doOnReceive(event)
  } finally {
    timerContext.stop()
  }
}

doOnReceive(event) 的实现细节:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
   
   // 处理提交过来的 Job
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId, reason) =>
    dagScheduler.handleStageCancellation(stageId, reason)
    
    // ...
    
}

任务最终会交给 DAGScheduler.handleJobSubmitted() 来执行:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // 通过 FinalRDD 来构建 ResultStage
    // 通过 RDD 的依赖关系,找到父依赖(到 ShuffleRDD 为止)来创建 ResultStage
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
     // ...
  }

  // 创建 Activer Job
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

  // 加入到 Activer Job 集合中
  activeJobs += job
  finalStage.setActiveJob(job)
  
  // 提交 FInal Stage
  submitStage(finalStage)
}

createResultStage() 根据 Final RDD 可以推断出 Result Stage。

接下来,让我们看看 DAGScheduler 是如何推断缺失的 Stage 的,DAGScheduler.submitStage() 的实现细节:

private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      
      // 通过当前的 stage 去推断前面丢失的 stages
      val missing = getMissingParentStages(stage).sortBy(_.id)
        
      if (missing.isEmpty) {
        // 如果当前 stage 无依赖的 stage
        // 提交任务
        submitMissingTasks(stage, jobId.get)
      } else {
        // 将最开始的 stage 先提交
        // 考虑到 stage 多依赖的情况
        for (parent <- missing) {
          // 这里相当于有个递归调用
          // 将最开始的 stage 先提交
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

DAGScheduler 通过 ResultStage 来向前推断父 Stage ,接下来我们看看 DAGScheduler.getMissingParentStages() 是如何进行推断的:

private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]

  // 等待被遍历的 RDD 栈
  val waitingForVisit = new Stack[RDD[_]]

  // 这里定义了一个内部方法,下面的 while 语句循环调用这个方法
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        // 遍历当前 RDD 的依赖列表
        for (dep <- rdd.dependencies) {
          dep match {
            // 如果是宽依赖
            case shufDep: ShuffleDependency[_, _, _] =>
              // 创建 ShuffleMapStage
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
              if (!mapStage.isAvailable) {
                // 将这个 Stage 添加到 Missing stages 中
                missing += mapStage
              }
            // 如果是窄依赖
            case narrowDep: NarrowDependency[_] =>
              // 将这个依赖的 RDD 添加到等待遍历的集合
              waitingForVisit.push(narrowDep.rdd)
          }
        }
      }
    }
  }
  waitingForVisit.push(stage.rdd)
  // 遍历等待被遍历的 RDD 栈
  while (waitingForVisit.nonEmpty) {
    // 对待遍历的 RDD 进行 visit(),现在看看 visit() 内部方法做了哪些操作
    visit(waitingForVisit.pop())
  }

  missing.toList
}

DAGScheduler 通过判断 RDD 是否为宽依赖作为 Stage 的划分标准,进而将一个任务划分成一个或多个 Stage。

最后我们 DAGScheduler.submitMissingTasks() 是如何将 Stage 提交的:

private def submitMissingTasks(stage: Stage, jobId: Int) {
    
  // 根据 Stage 的不同,选择不同的启动方式
  stage match {
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
    case s: ResultStage =>
      outputCommitCoordinator.stageStart(
        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  }

  // ...

  // 根据 Stage 的不同,创建不同的 tasks
  val tasks: Seq[Task[_]] = try {
    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
    stage match {
      case stage: ShuffleMapStage =>
        stage.pendingPartitions.clear()
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = partitions(id)
          stage.pendingPartitions += id

          // 创建成 ShuffleMapTask
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
            Option(sc.applicationId), sc.applicationAttemptId)
        }

      case stage: ResultStage =>
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = partitions(p)
          val locs = taskIdToLocations(id)

          // 创建成 ResultTask
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, properties, serializedTaskMetrics,
            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
        }
    }
  } catch {
     // 略略略..
  }

    // 其它操作

  if (tasks.size > 0) {
    // 略略略..

    // 这里调用了 TaskScheduler.submitTasks()
    // 将 TaskSet 交给 TaskScheduler 去执行
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  } else {
    // 略略略..
  }
}

简单的总结一下,DAGScheduler 对提交过来的任务进行了 Stage 的划分,并对每个 Stage 创建一个 TaskSet,最后通过 TaskScheduler.submitTasks() 方法,将各个TaskSet 交给 TaskScheduler 去处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容