Spark 源码分析(七): DAGScheduler 源码分析1(stage 划分算法)

前面几篇文章已经说清楚了从 spark 任务提交到 driver 启动,然后执行 main 方法,初始化 SparkContext 对象。

在初始化 SparkContext 对象的过程中创建了两个重要组件:

一个是 TaskScheduler(实际上是他的实现类 TaskSchedulerImpl 对象),这个对象内部会持有一个 SchedulerBackend 对象,SchedulerBackend 内部会又会持有一个 DriverEndpoint 对象(实际上就是一个 RpcEndpoint)。这样 TaskScheduler 就可以通过 SchedulerBackend 和集群资源管理器或者 Executor 对应 worker 节点进行通信做一些事情。比如向 master 节点去注册 application,master 在注册 application 的过程中会分配 worker 去启动 Executor,当 Executor 启动后又会和 TaskScheduler 进行注册。

另一个是 DAGScheduler,关于这个对象的创建过程前面没有详细讲,主要是因为 DAGScheduler 是在 SparkContext 初始化结束后,执行到 RDD 的 Action 操作的时候才会开始工作,下面就从 RDD 的 action 操作说起,看看 DAGScheduler 是怎么工作的。

还是以 wordcount 程序为例:

        val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./file/localfile")
    val words = lines.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey(_ + _)
    wordCounts.foreach(wordCount => println(wordCount._1 + "  " + wordCount._2))

当代码执行到 wordCounts.foreach 时候会调用到 RDD 的 foreach 方法,RDD 的 foreach 方法会去调用 SparkContext 的 runjob 方法。

SparkContext 中会有多个 runjob 方法,最后都会走到一个 runjob 那里去,这个 runjob 方法最终会调用 DAGScheduler 的 runJob 的方法,具体可以先看下这个 SparkContext 的 runjob 方法。

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    // 去调用 DAGScheduler 的 runjob 方法
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

最主要的还是 DAGScheduler 中的 runjob 方法。

这个 runjob 方法内部实际上调用了 submitJob 方法,用于提交 job。该方法返回一个 JobWaiter,用于等待 DAGScheduler 任务的完成。

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    // 调用 submitJob 方法
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

submitJob 方法是调用 eventProcessLoop 的 post 方法将 JobSubmitted 事件添加到 DAGScheduler 的事件队列中去。

eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))

这里的 eventProcessLoop 是 DAGSchedulerEventProcessLoop 对象,在 DAGScheduler 的初始化代码中可以看到。DAGSchedulerEventProcessLoop 实际上内部有一个线程,用来处理事件队列。

事件队列的处理最后会走到 DAGSchedulerEventProcessLoop 的 onReceive 的回调方法里面去。

/**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      // 调用 doOnReceive 方法
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

后面会去调用 doOnReceive 方法,根据 event 进行模式匹配,匹配到 JobSubmitted 的 event 后实际上是去调用 DAGScheduler 的 handleJobSubmitted 这个方法。

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    // 模式匹配
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      // 调用 handleJobSubmitted 方法
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

下面来看 handleJobSubmitted 这个方法做了哪些操作:

1,使用触发 job 的最后一个 rdd,创建 finalStage;

注: Stage 是一个抽象类,一共有两个实现,一个是 ResultStage,是用 action 中的函数计算结果的 stage;另一个是 ShuffleMapStage,是为 shuffle 准备数据的 stage。

2,构造一个 Job 对象,将上面创建的 finalStage 封装进去,这个 Job 的最后一个 stage 也就是这个 finalStage;

3,将 Job 的相关信息保存到内存的数据结构中;

4,调用 submitStage 方法提交 finalStage。

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // 使用触发 job 的最后一个 RDD 创建一个 ResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    // 使用前面创建好的 ResultStage 去创建一个 job
    // 这个 job 的最后一个 stage 就是 finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    // 将 job 的相关信息存储到内存中
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交 finalStage
    submitStage(finalStage)
  }

下面就会走进 submitStage 方法,这个方法是用来提交 stage 的,具体做了这些操作:

1,首先会验证 stage 对应的 job id 进行校验,存在才会继续执行;

2,在提交这个 stage 之前会判断当前 stage 的状态。

如果是 running、waiting、failed 的话就不做任何操作。

如果不是这三个状态则会根据当前 stage 去往前推前面的 stage,如果能找到前面的 stage 则继续递归调用 submitStage 方法,直到当前 stage 找不到前面的 stage 为止,这时候的 stage 就相当于当前 job 的第一个 stage,然后回去调用 submitMissingTasks 方法去分配 task。

private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    // 看看当前的 job 是否存在
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
       // 判断当前 stage 的状态
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 根据当前的 stage 去推倒前面的 stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        // 如果前面已经没有 stage 了,那么久将当前 stage 去执行 submitMissingTasks 方法
        // 如果前面还有 stage 的话那么递归调用 submitStage
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          // 将当前 stage 加入等待队列
          waitingStages += stage
        }
      }
    } else {
      // abortStage 终止提交当前 stage
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

上面最重要的一个地方就是使用当前 stage 向前推,找到前面的 stage,也是 stage 的划分算法。下面就看看 getMissingParentStages 这个划分算法做了哪些操作:

1,创建 missing 和 visited 两个 HashSet,分别用来存储根据当前 stage 向前找到的所有 stage 数据和已经调用过 visit 方法的 RDD;

2,创建一个存放 RDD 的栈,然后将传进来的 stage 中的 rdd 也就是 finalStage 中的那个 job 触发的最后一个 RDD 放入栈中;

3,然后将栈中的 RDD 拿出来调用 visit 方法,这个 visit 方法内部会根据当前 RDD 的依赖链逐个遍历所有 RDD,并且会根据相邻两个 RDD 的依赖关系来决定下面的操作:

如果是宽依赖,即 ShuffleDependency ,那么会调用 getOrCreateShuffleMapStage 创建一个新的 stage,默认每个 job 的最后一个 stage 是 ResultStage,剩余的 job 中的其它 stage 均为 ShuffleMapStage。然后会将创建的这个 stage 加入前面创建的 missing 的 HashSet 中;

如果是窄依赖,即 NarrowDependency,那么会将该 RDD 加入到前面创建的 RDD 栈中,继续遍历调用 visit 方法。

直到所有的 RDD 都遍历结束后返回前面创建的 missing 的集合。

private def getMissingParentStages(stage: Stage): List[Stage] = {
    // 存放下面找到的所有 stage
    val missing = new HashSet[Stage]
    // 存放已经遍历过的 rdd
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 创建一个维护 RDD 的栈
    val waitingForVisit = new Stack[RDD[_]]
    // visit 方法
    def visit(rdd: RDD[_]) {
      // 判断当前 rdd 是否 visit 过
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          // 遍历当前 RDD 的依赖链
          for (dep <- rdd.dependencies) {
            dep match {
              // 如果是宽依赖
              case shufDep: ShuffleDependency[_, _, _] =>
                // 创建 ShuffleMapStage 
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  // 加入 missing 集合
                  missing += mapStage
                }
              // 如果是窄依赖
              case narrowDep: NarrowDependency[_] =>
                // 加入等待 visit 的集合中,准备下一次遍历
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    // 将传入的 stage 中的 rdd 拿出来压入 waitingForVisit 的栈中
    waitingForVisit.push(stage.rdd)
    // 遍历栈里的所有 RDD 
    while (waitingForVisit.nonEmpty) {
      // 调用 visit 方法
      visit(waitingForVisit.pop())
    }
    // 返回 missing 这个 stage 集合
    missing.toList
  }

至此,所有的 stage 都已经划分结束了。可以看出每个 Spark Application 执行代码的时候,每当碰到一个 Action 操作就会划分出一个 Job,然后每个 Job 里会根据宽窄依赖去划分出多个 stage。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342