Spark Job执行流程源码解析

上一篇文章讲解了RDD的基本概念, 这篇文章尝试分析当Spark拿到一个RDD之后是如何处理它的. 文中会涉及到Spark内部的实现细节, 希望通过本篇文章让大家对Spark有一个更深层次的了解.

背景知识

DAG

DAG(Directed Acyclic Graph) 中文名是有向无环图. DAG是有向无环图(Directed Acyclic Graph)的简称. 在大数据处理领域,
DAG计算模型是指将计算任务在内部分解为若干个子任务, 这些子任务之间由逻辑关系或运行先后顺序等因素被构建成有向无环图. Spark是实现了DAG计算模型的计算框架.

Spark运行时架构

首先, 先来熟悉一下Spark的运行时架构.

驱动器

Spark驱动器就是执行程序main()函数的进程. 驱动器有一下职责:

  1. 负责把用户程序转化为多个物理执行单元(task). Task是Spark中最小的工作单元. 具体的步骤是首先它会把用户程序转化成DAG, 然后在把DAG转化成task.
  2. 为执行器(Executor)调度任务. 驱动器启动成功后会向Driver程序注册自己, Driver程序保存了所有可用的Executor. 当物理执行计划生成之后它要负责协调哪些任务在哪些Executor执行. 这个过程Driver根据任务基于的数据所在的位置给其分配执行器.

执行器

执行器是最"基层", "干活"的进程. Spark应用启动的时候, Executor进程就被同时启动, 知道整个Spark应用关闭, Executor被关闭. 它的具体职责是:

  1. 执行Driver交给的任务, 并返回结果.
  2. 通过自身的Block Manager为用户程序中要求缓存的RDD提供基于内存的缓存.

Job执行流程

在Spark中一个Job抽象的执行流程大概就是这样的:Job提交 -> Driver把RDD转化为DAG -> 根据DAG转化为Task -> Task提交给Executor -> Result.

+-------+          +----------------+            +-------------+
|  RDD  | --DAG--> |  DAGScheduler  | --Tasks--> |  Executors  |
+-------+          +----------------+            +-------------+

简单解释一下涉及到的名词都是什么意思:
Job: 在用户程序中, 每次调用Action函数都会产生一个新的job, 也就是说一个Action都会生成一个job.
Task: Task是Spark中最小的工作单元, Spark中的程序最终都要分解成一个个Task提交到Scheduler.
Stage: Stage对应DAG中的任务单元.

RDD依赖和DAG的构建

在第一篇文章中, 我提到过执行transformation函数的一个作用是构建RDD之间的依赖关系. 具体来说依赖有宽窄之分, 如果子RDD中的每个分区依赖常数个父RDD中的分区, 我们把这种依赖叫做窄依赖; 如果子RDD中的每个数据分片依赖父RDD的所有分片, 我们把这种依赖叫做宽依赖.

在这儿我们在引入一个新的词汇lineage, 在spark中每个RDD都携带自己的lineage. 而lineage就是通过RDD之间的依赖来表示的.

wide-narrow-dependency

我们通过这幅图可以大概看一下宽窄依赖到底是这么回事. 图中矩形框围住的部分是RDD, 实心小矩形是Partition.

接下来我们看一下Spark是如何构建DAG的. 当用户调用Action函数时, 调度器会逆向的遍历该RDD的lineage, 每个stage会尝试尽可能多包含那些连续的窄依赖. 如果当前的Stage向上回溯的过程中遇到了宽依赖, 则当前Stage结束, 一个新的Stage被构建. 第二个Stage是第一个Stage的parent. 还有一种情况也会结束当前Stage, 那就是那个partition已经被计算出来, 换存在内存中, 这种情况下我们就不必作多余的计算了.

内部实现

我们依据上边抽象的Job执行流程为依据, 从代码入手看一下Spark内部代码实现.

第一个阶段: Job -> Stage.

这个阶段主战场在DAGScheduler, 假设我们调用了reduce函数, reduce内部会调用SparkContext.runJob函数. 在SparkContext内部, 经过一系列函数的调用, 最终通过调用DAGScheduler.runJob函数把Job提交给DAGScheduler.

我们看一下DAGScheduler runjob函数:

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
  //...
}

首先它先将job提交, 然后创建JobWaiter以阻塞的方式等待job执行结果.
我们接下来看一下DAGScheduler submitJob的过程?

它向DAGSchedulerEventProcessLoop post了一个JobSubmitted事件. DAGSchedulerEventProcessLoop接到JobSubmitted事件之后会调用DAGScheduler的handleJobSubmitted函数. 正是这个函数触发了RDD到DAG的转化. 我们重点来看一下这个函数的实现(删掉了一些我们不关心的代码):

private[scheduler] def handleJobSubmitted(...) {
    var finalStage: ResultStage = null
    try {
      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    submitStage(finalStage)
    submitWaitingStages()
  }

通过newResultStage我们拿到了最DAG的最后一个Stage(finalStage), 最有一个Stage都是ResultStage. 如果我们反向的遍历就能够知道整个DAG, 这个稍后我们会具体分析newResultStage的实现. 接着看handleJobSubmitted函数, 在拿到DAG的最后一个Stage后, 通过submitStage把它提交, 不出意外submitStage肯定实在向Executor提交Task. 我们按顺序先看newResultStage是如何生成DAG的.

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, firstJobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

上边背景知识部分我们已经大概知道了Spark是如何划分Stage的. 简单的说就是遇到宽依赖, 就生成新的Stage. 宽依赖会触发shuffle. 我们来看上边代码的visit函数: 拿到RDD的所有的dependency, 如果是窄依赖那么继续查找依赖的RDD的parent; 如果是宽依赖, 则调用getShuffleMapStage把生成的Stage加到当前stage的parents中. 该函数执行完毕, 则整个DAG就构建完成.

看完DAG的构建过程, 我们继续沿着submitStage那条线看下去(以下源码做了部分删减).

private def submitStage(stage: Stage) {
  ...
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
}

这个函数非常简单, 先把当前stage的parents提交, 完事儿后在提交自己. 我们重点关注一下: submitMissingTasks.

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
  ...
  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.internalAccumulators)
        }

      case stage: ResultStage =>
        val job = stage.activeJob.get
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)

          /**
            * !! 一个ResultTask包含了task的定义(这个task要干什么), 以及在那个partition(part)执行该task
            */
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, stage.internalAccumulators)
        }
    }
  } catch {
    case NonFatal(e) =>
      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
      runningStages -= stage
      return
  }

  if (tasks.size > 0) {
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    ...
  }
}

首先先找出该stage中所有未执行过的partition, 然后把序列化后的task(taskBinary), partition(part)等信息封装成task. ShuffleMapStage转化成ShuffleMapTask, ResultStage转化成ResultStage. ShuffleMapTask的处理过程会比较复杂一下, 因为会涉及到shuffle的过程. 这个我们后续分析Executor如何执行是再详说. 函数最后我们看到新生成的tasks封装成TaskSet提交给TaskScheduler.
到此我们分析了RDD如何转化成DAG, DAG是如何生成Task并提交的. 接下来分析Executor如何处理Task.

Executor如何处理Tasks

Task提交成功之后, 我们来看一下都有哪些类参与到了Task执行的过程中? 第一个类就是TaskScheduler, 它主要负责Task的调度工作. 第二个类是SchedulerBackend, SchedulerBackend的作用是向TaskScheduler申请任务, 并分配给Executor去执行. SchedulerBackend可以有不同的实现. 支持本地单机运行的是LocalBackend, 支持Mesos集群运行的是MesosSchedulerBackend, etc. 下面我们分析以本地单机运行为例解释任务执行的过程.
文章上一部分分析到DAGScheduler调用TaskScheduler的submitTasks方法提交Task, 那我们接着来看submitTasks的实现:

override def submitTasks(taskSet: TaskSet) {
  ...
  backend.reviveOffers()
}

submitTasks最后一步调用了LocalBackend的reviveOffers函数, 这个函数是提醒LocalBackend可以开始执行任务了. 我们继续看reviveOffers的实现:

def reviveOffers() {
  val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
  /** 向TaskSchedulerImpl申请Task */
  for (task <- scheduler.resourceOffers(offers).flatten) {
    freeCores -= scheduler.CPUS_PER_TASK
    executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
      task.name, task.serializedTask)
  }
}

首先, 它先向scheduler申请task. 然后把tasks提交给Executor. 在申请Task的时候, 把自己空闲的cpu个数发送给Scheduler, 以便让Scheduler按资源分配任务. 我们来看Scheduler是如何分配Task的.

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  val shuffledOffers = Random.shuffle(offers)
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  var launchedTask = false

  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}

private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  var launchedTask = false
  for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    if (availableCpus(i) >= CPUS_PER_TASK) {
      try {
        for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          tasks(i) += task
          val tid = task.taskId
          taskIdToTaskSetManager(tid) = taskSet
          taskIdToExecutorId(tid) = execId
          executorIdToTaskCount(execId) += 1
          executorsByHost(host) += execId
          availableCpus(i) -= CPUS_PER_TASK
          assert(availableCpus(i) >= 0)
          launchedTask = true
        }
      } catch {
        case e: TaskNotSerializableException =>
          logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
          // Do not offer resources for this task, but don't throw an error to allow other
          // task sets to be submitted.
          return launchedTask
      }
    }
  }
  return launchedTask
}

TaskScheduler每次把所有的TaskSet都取出来, 这些TaskSet都按照一定算法进行了了排序, 排在前边的TaskSet会被优先分配Executor.
在resourceOfferSingleTaskSet函数中, 我们可以知道只有workoffer中Executor的空闲cpu个数大于设定的每个Task需要的cpu数量时, 才把当前的Task添加到tasks列表里.
总得来说任务调度的过程是: Backend向Scheduler发出work offer, worker offer中记录着自己的基本信息, 自己的空闲资源, Scheduler根据worker offer中executor的空闲资源为其分配合适的任务.

当Backend拿到Task之后, 依次把Task提交给Executor. TaskRunner继承了Runnable, 所以每个Task都是单独的线程去执行.

def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
    serializedTask)
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}

代码执行到这里, Task开始真正执行.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容