Spark Job 详细执行流程(二)

Spark Job执行流程大体如下:用户提交Job后会生成SparkContext对象,SparkContext向Cluster Manager(在Standalone模式下是Spark Master)申请Executor资源,并将Job分解成一系列可并行处理的task,然后将task分发到不同的Executor上运行,Executor在task执行完后将结果返回到SparkContext。

上文中(戳这)详细介绍了Spark申请Executor资源的过程。下面介绍Job从拆分成一系列task到task分发到Executor上运行的过程。整个过程如下图所示。

Job执行流程
  1. DAGScheduler接收用户提交的job
    用户提交Job后,SparkContext通过runJob()调用DAGScheduler的runJob()。在runJob()中,调用submitJob来提交Job,然后等待Job的运行结果。
def runJob[T, U](  
                        rdd: RDD[T],  
                        func: (TaskContext, Iterator[T]) => U,  
                        partitions: Seq[Int],  
                        callSite: CallSite,  
                        allowLocal: Boolean,  
                        resultHandler: (Int, U) => Unit,  
                        properties: Properties): Unit = {  
    val start = System.nanoTime  
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)  
    waiter.awaitResult() match {  
        case JobSucceeded =>  
            logInfo("Job %d finished: %s, took %f s".format  
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
        case JobFailed(exception: Exception) =>  
            logInfo("Job %d failed: %s, took %f s".format  
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
            throw exception  
    }  
}  

submitJob()通过eventProcessLoop把Job交给handleJobSubmitted()处理。

def submitJob[T, U](  
                           rdd: RDD[T],  
                           func: (TaskContext, Iterator[T]) => U,  
                           partitions: Seq[Int],  
                           callSite: CallSite,  
                           allowLocal: Boolean,  
                           resultHandler: (Int, U) => Unit,  
                           properties: Properties): JobWaiter[U] = {  
    // Check to make sure we are not launching a task on a partition that does not exist.  
    val maxPartitions = rdd.partitions.length  
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>  
        throw new IllegalArgumentException(  
            "Attempting to access a non-existent partition: " + p + ". " +  
                    "Total number of partitions: " + maxPartitions)  
    }  
  
    val jobId = nextJobId.getAndIncrement()  
    if (partitions.size == 0) {  
        return new JobWaiter[U](this, jobId, 0, resultHandler)  
    }  
  
    assert(partitions.size > 0)  
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  
    eventProcessLoop.post(JobSubmitted(  
        jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,  
        SerializationUtils.clone(properties)))  
    waiter  
}  
  1. DAGScheduler将job拆分为不同的stage
    首先每个job自动产生一个finalStage,然后递归地得到整个stage DAG。
private[scheduler] def handleJobSubmitted(jobId: Int,  
                                            finalRDD: RDD[_],  
                                            func: (TaskContext, Iterator[_]) => _,  
                                            partitions: Array[Int],  
                                            allowLocal: Boolean,  
                                            callSite: CallSite,  
                                            listener: JobListener,  
                                            properties: Properties) {  
      var finalStage: ResultStage = null  
      try {  
          // New stage creation may throw an exception if, for example, jobs are run on a  
          // HadoopRDD whose underlying HDFS files have been deleted.  
          finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)  
      } catch {  
          case e: Exception =>  
              logWarning("Creating new stage failed due to exception - job: " + jobId, e)  
              listener.jobFailed(e)  
              return  
      }  
      if (finalStage != null) {  
          val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)  
          clearCacheLocs()  
          logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(  
              job.jobId, callSite.shortForm, partitions.length, allowLocal))  
          logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")  
          logInfo("Parents of final stage: " + finalStage.parents)  
          logInfo("Missing parents: " + getMissingParentStages(finalStage))  
          val shouldRunLocally =  
              localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1  
          val jobSubmissionTime = clock.getTimeMillis()  
          if (shouldRunLocally) {  
              // Compute very short actions like first() or take() with no parent stages locally.  
              listenerBus.post(  
                  SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))  
              runLocally(job)  
          } else {  
              jobIdToActiveJob(jobId) = job  
              activeJobs += job  
              finalStage.resultOfJob = Some(job)  
              val stageIds = jobIdToStageIds(jobId).toArray  
              val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))  
              listenerBus.post(  
                  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))  
              submitStage(finalStage)  
          }  
      }  
      submitWaitingStages()  
  }  

submitStage负责得到整个stage DAG,并调用submitMissingTasks(()把每个stage拆分成可运行的task。

private def submitStage(stage: Stage) {  
    val jobId = activeJobForStage(stage)  
    if (jobId.isDefined) {  
        logDebug("submitStage(" + stage + ")")  
        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  
            val missing = getMissingParentStages(stage).sortBy(_.id)  
            logDebug("missing: " + missing)  
            if (missing.isEmpty) {  
                logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")  
                submitMissingTasks(stage, jobId.get)  
            } else {  
                for (parent <- missing) {  
                    submitStage(parent)  
                }  
                waitingStages += stage  
            }  
        }  
    } else {  
        abortStage(stage, "No active job for stage " + stage.id)  
    }  
}  

注意stage之间有依赖关系,所以Spark是一个一个stage地运行。正在运行的stage保存在runningStages,等待运行的stage保存在waitingStages。当一个stage运行成功后,DAGScheduler在handleTaskCompletion()里运行下一个stage。

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {  
        val stage = stageIdToStage(task.stageId)  
        event.reason match {  
            case Success =>  
                listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,  
                    event.reason, event.taskInfo, event.taskMetrics))  
                stage.pendingTasks -= task  
                task match {  
                    ......  
                    case smt: ShuffleMapTask =>  
                        val shuffleStage = stage.asInstanceOf[ShuffleMapStage]  
                        updateAccumulators(event)  
                        val status = event.result.asInstanceOf[MapStatus]  
                        val execId = status.location.executorId  
                        logDebug("ShuffleMapTask finished on " + execId)  
                        if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {  
                            logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)  
                        } else {  
                            shuffleStage.addOutputLoc(smt.partitionId, status)  
                        }  
                        if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {  
                            markStageAsFinished(shuffleStage)  
                            mapOutputTracker.registerMapOutputs(  
                                shuffleStage.shuffleDep.shuffleId,  
                                shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,  
                                changeEpoch = true)  
  
                            clearCacheLocs()  
                            if (shuffleStage.outputLocs.contains(Nil)) {  
                                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +  
                                        ") because some of its tasks had failed: " +  
                                        shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)  
                                                .map(_._2).mkString(", "))  
                                submitStage(shuffleStage)  
                            } else {  
                                val newlyRunnable = new ArrayBuffer[Stage]  
                                for (shuffleStage <- waitingStages) {  
                                    logInfo("Missing parents for " + shuffleStage + ": " +  
                                            getMissingParentStages(shuffleStage))  
                                }  
                                for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty) {  
                                    newlyRunnable += shuffleStage  
                                }  
                                waitingStages --= newlyRunnable  
                                runningStages ++= newlyRunnable  
                                for {  
                                    shuffleStage <- newlyRunnable.sortBy(_.id)  
                                    jobId <- activeJobForStage(shuffleStage)  
                                } {  
                                    logInfo("Submitting " + shuffleStage + " (" +  
                                            shuffleStage.rdd + "), which is now runnable")  
                                    submitMissingTasks(shuffleStage, jobId)  
                                }  
                            }  
                        }  
                }  
        }  
        submitWaitingStages()  
        ......  
    }  
  1. DAGScheduler把每个stage拆分为可并行计算的task, 并将所有task提交到TaskSchedulerImpl
    submitMissingTasks产生出与partition数量相等的task,并封装成TaskSet,提交给TaskSchedulerImpl。
private def submitMissingTasks(stage: Stage, jobId: Int) {  
......  
if (tasks.size > 0) {  
            logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")  
            stage.pendingTasks ++= tasks  
            logDebug("New pending tasks: " + stage.pendingTasks)  
            taskScheduler.submitTasks(  
                new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))  
            stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  
        } else {  
            // Because we posted SparkListenerStageSubmitted earlier, we should mark  
            // the stage as completed here in case there are no tasks to run  
            markStageAsFinished(stage, None)  
  
            val debugString = stage match {  
                case stage: ShuffleMapStage =>  
                    s"Stage ${stage} is actually done; " +  
                            s"(available: ${stage.isAvailable}," +  
                            s"available outputs: ${stage.numAvailableOutputs}," +  
                            s"partitions: ${stage.numPartitions})"  
                case stage: ResultStage =>  
                    s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"  
            }  
            logDebug(debugString)  
        }  
    }  

TaskSchedulerImpl的submitTasks将TaskSet封装成TaskSetManager,放入调度器(schedulableBuilder)等待调度(Spark有两种调度方式:FIFO和Fair。注意只调度同一SparkContext下的任务)。之后调用SparkDeploySchedulerBackend的reviveOffers()。TaskSetManager主要用来调度一个TaskSet内的task,比如,为给定的executor分配一个task。

override def submitTasks(taskSet: TaskSet) {  
  val tasks = taskSet.tasks  
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  
  this.synchronized {  
    val manager = createTaskSetManager(taskSet, maxTaskFailures)  
    activeTaskSets(taskSet.id) = manager  
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  
  
    if (!isLocal && !hasReceivedTask) {  
      starvationTimer.scheduleAtFixedRate(new TimerTask() {  
        override def run() {  
          if (!hasLaunchedTask) {  
            logWarning("Initial job has not accepted any resources; " +  
              "check your cluster UI to ensure that workers are registered " +  
              "and have sufficient resources")  
          } else {  
            this.cancel()  
          }  
        }  
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)  
    }  
    hasReceivedTask = true  
  }  
  backend.reviveOffers()  
}  

SparkDeploySchedulerBackend的reviveOffers()向driver发送ReviveOffers,driver收到ReviveOffers后调用makeOffers()。

case ReviveOffers =>  
        makeOffers()  
  1. SparkDeploySchedulerBackend调用Executor执行task
    首先通过resourceOffers得到在哪个Executor运行哪个task的信息,然后调用launchTasks向Executor发送task。
private def makeOffers() {  
  // Filter out executors under killing  
  val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))  
  val workOffers = activeExecutors.map { case (id, executorData) =>  
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
  }.toSeq  
  launchTasks(scheduler.resourceOffers(workOffers))  
}  
  1. Executor执行task
    CoarseGrainedExecutorBackend在接收到LaunchTask后,调用Executor的launchTask运行task。
override def receive: PartialFunction[Any, Unit] = {  
  
    case LaunchTask(data) =>  
      if (executor == null) {  
        logError("Received LaunchTask command but executor was null")  
        System.exit(1)  
      } else {  
        val taskDesc = ser.deserialize[TaskDescription](data.value)  
        logInfo("Got assigned task " + taskDesc.taskId)  
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,  
          taskDesc.name, taskDesc.serializedTask)  
      }  

Executor的内部是一个线程池,每一个提交的task都会包装为TaskRunner交由threadpool执行。

def launchTask(  
    context: ExecutorBackend,  
    taskId: Long,  
    attemptNumber: Int,  
    taskName: String,  
    serializedTask: ByteBuffer): Unit = {  
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,  
    serializedTask)  
  runningTasks.put(taskId, tr)  
  threadPool.execute(tr)  
}  

在TaskRunner中,task.run()真正运行每个task的任务。

class TaskRunner(  
      execBackend: ExecutorBackend,  
      val taskId: Long,  
      val attemptNumber: Int,  
      taskName: String,  
      serializedTask: ByteBuffer)  
    extends Runnable {  
    ...... 
    override def run(): Unit = {  
      val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)  
      val deserializeStartTime = System.currentTimeMillis()  
      Thread.currentThread.setContextClassLoader(replClassLoader)  
      val ser = env.closureSerializer.newInstance()  
      logInfo(s"Running $taskName (TID $taskId)")  
      execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)  
      var taskStart: Long = 0  
      startGCTime = computeTotalGcTime()  
  
      try {  
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)  
        updateDependencies(taskFiles, taskJars)  
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)  
        task.setTaskMemoryManager(taskMemoryManager)  
        ......
        env.mapOutputTracker.updateEpoch(task.epoch)  
  
        // Run the actual task and measure its runtime.  
        taskStart = System.currentTimeMillis()  
        val value = try {  
          task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)  
        } finally {  
          ......
        }  
        ......
    }  
  }  

最终,每个task的运行都会调用iterator()来递归计算RDD。下面是以ShufflerMapTask为例,rdd.iterator(partition, context)会从根partition来计算这个task的输出partition。

private[spark] class ShuffleMapTask(  
    stageId: Int,  
    taskBinary: Broadcast[Array[Byte]],  
    partition: Partition,  
    @transient private var locs: Seq[TaskLocation])  
  extends Task[MapStatus](stageId, partition.index) with Logging {  
  
  override def runTask(context: TaskContext): MapStatus = {  
    // Deserialize the RDD using the broadcast variable.  
    val deserializeStartTime = System.currentTimeMillis()  
    val ser = SparkEnv.get.closureSerializer.newInstance()  
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime  
  
    metrics = Some(context.taskMetrics)  
    var writer: ShuffleWriter[Any, Any] = null  
    try {  
      val manager = SparkEnv.get.shuffleManager  
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])  
      return writer.stop(success = true).get  
    } catch {  
      case e: Exception =>  
        try {  
          if (writer != null) {  
            writer.stop(success = false)  
          }  
        } catch {  
          case e: Exception =>  
            log.debug("Could not stop writer", e)  
        }  
        throw e  
    }  
  }  
}  

至此,一个stage的TaskSet的执行流程结束,等此TaskSet中的所有task结束后会回到第三步继续执行下一个stage,直到finalStage结束。:)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容