[Spark源码剖析] DAGScheduler提交stage

DAGScheduler通过调用submitStage来提交stage,实现如下:

  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //< 获取该stage未提交的父stages,并按stage id从小到大排序
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //< 若无未提交的父stage, 则提交该stage对应的tasks
          submitMissingTasks(stage, jobId.get)
        } else {
          //< 若存在未提交的父stage, 依次提交所有父stage (若父stage也存在未提交的父stage, 则提交之, 依次类推); 并把该stage添加到等待stage队列中
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

submitStage先调用getMissingParentStages来获取参数stageX(这里为了区分,取名为stageX)是否有未提交的父stages,若有,则依次递归(按stage id从小到大排列,也就是stage是从后往前提交的)提交父stages,并将stageX加入到waitingStages: HashSet[Stage]中。对于要依次提交的父stage,也是如此。

getMissingParentStagesDAGScheduler划分stage中介绍的getParentStages有点像,但不同的是不再需要划分stage,并对每个stage的状态做了判断,源码及注释如下:

//< 以参数stage为起点,向前遍历所有stage,判断stage是否为未提交,若使则加入missing中
  private def getMissingParentStages(stage: Stage): List[Stage] = {
    //< 未提交的stage
    val missing = new HashSet[Stage]
    //< 存储已经被访问到得RDD
    val visited = new HashSet[RDD[_]]

    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              //< 若为宽依赖,生成新的stage
              case shufDep: ShuffleDependency[_, _, _] =>
                //< 这里调用getShuffleMapStage不像在getParentStages时需要划分stage,而是直接根据shufDep.shuffleId获取对应的ShuffleMapStage
                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                if (!mapStage.isAvailable) {
                  // 若stage得状态为available则为未提交stage
                  missing += mapStage
                }
              //< 若为窄依赖,那就属于同一个stage。并将依赖的RDD放入waitingForVisit中,以能够在下面的while中继续向上visit,直至遍历了整个DAG图
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

上面提到,若stageX存在未提交的父stages,则先提交父stages;那么,如果stageX没有未提交的父stage呢(比如,包含从HDFS读取数据生成HadoopRDD的那个stage是没有父stage的)?

这时会调用submitMissingTasks(stage, jobId.get),参数就是stageX及其对应的jobId.get。这个函数便是我们时常在其他文章或书籍中看到的将stage与taskSet对应起来,然后DAGScheduler将taskSet提交给TaskScheduler去执行的实施者。这个函数的实现比较长,下面分段说明。

Step1: 得到RDD中需要计算的partition

对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。这么做(没有直接提交全部tasks)的原因是,stage中某个task执行失败其他执行成功的时候就需要找出这个失败的task对应要计算的partition而不是要计算所有partition

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    stage.pendingTasks.clear()

    //< 首先得到RDD中需要计算的partition
    //< 对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;
    //< 对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成
    //< 这么做的原因是,stage中某个task执行失败其他执行成功地时候就需要找出这个失败的task对应要计算的partition而不是要计算所有partition
    val partitionsToCompute: Seq[Int] = {
      stage match {
        case stage: ShuffleMapStage =>
          (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
        case stage: ResultStage =>
          val job = stage.resultOfJob.get
          (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

Step2: 序列化task的binary

Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化

var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          //< 对于ShuffleMapTask,将rdd及其依赖关系序列化;在Executor执行task之前会反序列化
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
          //< 对于ResultTask,对rdd及要在每个partition上执行的func
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      //< 将序列化好的信息广播给所有的executor
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString)
        runningStages -= stage

        // Abort execution
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }

Step3: 为每个需要计算的partiton生成一个task

ShuffleMapStage对应的task全是ShuffleMapTask; ResultStage对应的全是ResultTask。task继承Serializable,要确保task是可序列化的。

val tasks: Seq[Task[_]] = stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = getPreferredLocs(stage.rdd, id)
          //< RDD对应的partition
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, taskBinary, part, locs)
        }

      case stage: ResultStage =>
        val job = stage.resultOfJob.get
        //< id为输出分区索引,表示reducerID
        partitionsToCompute.map { id =>
          val p: Int = job.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = getPreferredLocs(stage.rdd, p)
          new ResultTask(stage.id, taskBinary, part, locs, id)
        }
    }

Step4: 提交tasks

先用tasks来初始化一个TaskSet对象,再调用TaskScheduler.submitTasks提交

stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      //< 提交TaskSet至TaskScheduler
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      //< 记录stage提交task的时间
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {

以上,介绍了提交stage和提交tasks的实现。本文若有纰漏,请批评指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容