Spark Job运行(下)

Spark Job执行流程大体如下:用户提交Job后会生成SparkContext对象,SparkContext向Cluster Manager(在Standalone模式下是Spark Master)申请Executor资源,并将Job分解成一系列可并行处理的task,然后将task分发到不同的Executor上运行,Executor在task执行完后将结果返回到SparkContext。

注:上文介绍Spark申请Executor资源的过程。本文介绍Job从拆分成一系列task到task分发到Executor上运行的过程。如下图所示!


1729304-532e8afb9c730b28.png

1.DAGScheduler接收用户提交的job
用户提交Job后,SparkContext通过runJob()调用DAGScheduler的runJob()。在runJob()中,调用submitJob来提交Job,然后等待Job的运行结果。

def runJob[T, U](  
                     rdd: RDD[T],  
                     func: (TaskContext, Iterator[T]) => U,  
                     partitions: Seq[Int],  
                     callSite: CallSite,  
                     allowLocal: Boolean,  
                     resultHandler: (Int, U) => Unit,  
                     properties: Properties): Unit = {  
 val start = System.nanoTime  
 val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)  
 waiter.awaitResult() match {  
     case JobSucceeded =>  
         logInfo("Job %d finished: %s, took %f s".format  
         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
     case JobFailed(exception: Exception) =>  
         logInfo("Job %d failed: %s, took %f s".format  
         (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
         throw exception  
 }  
}

submitJob()通过eventProcessLoop把Job交给handleJobSubmitted()处理。

def submitJob[T, U](  
                        rdd: RDD[T],  
                        func: (TaskContext, Iterator[T]) => U,  
                        partitions: Seq[Int],  
                        callSite: CallSite,  
                        allowLocal: Boolean,  
                        resultHandler: (Int, U) => Unit,  
                        properties: Properties): JobWaiter[U] = {  
 // Check to make sure we are not launching a task on a partition that does not exist.  
 val maxPartitions = rdd.partitions.length  
 partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>  
     throw new IllegalArgumentException(  
         "Attempting to access a non-existent partition: " + p + ". " +  
                 "Total number of partitions: " + maxPartitions)  
 }  

 val jobId = nextJobId.getAndIncrement()  
 if (partitions.size == 0) {  
     return new JobWaiter[U](this, jobId, 0, resultHandler)  
 }  

 assert(partitions.size > 0)  
 val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  
 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  
 eventProcessLoop.post(JobSubmitted(  
     jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,  
     SerializationUtils.clone(properties)))  
 waiter  
}
  1. DAGScheduler将job拆分为不同的stage
    首先每个job自动产生一个finalStage,然后递归地得到整个stage DAG。
private[scheduler] def handleJobSubmitted(jobId: Int,  
                                         finalRDD: RDD[_],  
                                         func: (TaskContext, Iterator[_]) => _,  
                                         partitions: Array[Int],  
                                         allowLocal: Boolean,  
                                         callSite: CallSite,  
                                         listener: JobListener,  
                                         properties: Properties) {  
   var finalStage: ResultStage = null  
   try {  
       // New stage creation may throw an exception if, for example, jobs are run on a  
       // HadoopRDD whose underlying HDFS files have been deleted.  
       finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)  
   } catch {  
       case e: Exception =>  
           logWarning("Creating new stage failed due to exception - job: " + jobId, e)  
           listener.jobFailed(e)  
           return  
   }  
   if (finalStage != null) {  
       val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)  
       clearCacheLocs()  
       logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(  
           job.jobId, callSite.shortForm, partitions.length, allowLocal))  
       logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")  
       logInfo("Parents of final stage: " + finalStage.parents)  
       logInfo("Missing parents: " + getMissingParentStages(finalStage))  
       val shouldRunLocally =  
           localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1  
       val jobSubmissionTime = clock.getTimeMillis()  
       if (shouldRunLocally) {  
           // Compute very short actions like first() or take() with no parent stages locally.  
           listenerBus.post(  
               SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))  
           runLocally(job)  
       } else {  
           jobIdToActiveJob(jobId) = job  
           activeJobs += job  
           finalStage.resultOfJob = Some(job)  
           val stageIds = jobIdToStageIds(jobId).toArray  
           val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))  
           listenerBus.post(  
               SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))  
           submitStage(finalStage)  
       }  
   }  
   submitWaitingStages()  
}

submitStage负责得到整个stage DAG,并调用submitMissingTasks(()把每个stage拆分成可运行的task。

private def submitStage(stage: Stage) {  
 val jobId = activeJobForStage(stage)  
 if (jobId.isDefined) {  
     logDebug("submitStage(" + stage + ")")  
     if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  
         val missing = getMissingParentStages(stage).sortBy(_.id)  
         logDebug("missing: " + missing)  
         if (missing.isEmpty) {  
             logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")  
             submitMissingTasks(stage, jobId.get)  
         } else {  
             for (parent <- missing) {  
                 submitStage(parent)  
             }  
             waitingStages += stage  
         }  
     }  
 } else {  
     abortStage(stage, "No active job for stage " + stage.id)  
 }  
}

注意stage之间有依赖关系,所以Spark是一个一个stage地运行。正在运行的stage保存在runningStages,等待运行的stage保存在waitingStages。当一个stage运行成功后,DAGScheduler在handleTaskCompletion()里运行下一个stage。

  1. DAGScheduler把每个stage拆分为可并行计算的task, 并将所有task提交到TaskSchedulerImpl
    submitMissingTasks产生出与partition数量相等的task,并封装成TaskSet,提交给TaskSchedulerImpl。
private def submitMissingTasks(stage: Stage, jobId: Int) {  
......  
if (tasks.size > 0) {  
         logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")  
         stage.pendingTasks ++= tasks  
         logDebug("New pending tasks: " + stage.pendingTasks)  
         taskScheduler.submitTasks(  
             new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))  
         stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  
     } else {  
         // Because we posted SparkListenerStageSubmitted earlier, we should mark  
         // the stage as completed here in case there are no tasks to run  
         markStageAsFinished(stage, None)  

         val debugString = stage match {  
             case stage: ShuffleMapStage =>  
                 s"Stage ${stage} is actually done; " +  
                         s"(available: ${stage.isAvailable}," +  
                         s"available outputs: ${stage.numAvailableOutputs}," +  
                         s"partitions: ${stage.numPartitions})"  
             case stage: ResultStage =>  
                 s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"  
         }  
         logDebug(debugString)  
     }  
 }

TaskSchedulerImpl的submitTasks将TaskSet封装成TaskSetManager,放入调度器(schedulableBuilder)等待调度(Spark有两种调度方式:FIFO和Fair。注意只调度同一SparkContext下的任务)。之后调用SparkDeploySchedulerBackend的reviveOffers()。TaskSetManager主要用来调度一个TaskSet内的task,比如,为给定的executor分配一个task。

val tasks = taskSet.tasks  
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  
this.synchronized {  
 val manager = createTaskSetManager(taskSet, maxTaskFailures)  
 activeTaskSets(taskSet.id) = manager  
 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  

 if (!isLocal && !hasReceivedTask) {  
   starvationTimer.scheduleAtFixedRate(new TimerTask() {  
     override def run() {  
       if (!hasLaunchedTask) {  
         logWarning("Initial job has not accepted any resources; " +  
           "check your cluster UI to ensure that workers are registered " +  
           "and have sufficient resources")  
       } else {  
         this.cancel()  
       }  
     }  
   }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)  
 }  
 hasReceivedTask = true  
}  
backend.reviveOffers()  
}

SparkDeploySchedulerBackend的reviveOffers()向driver发送ReviveOffers,driver收到ReviveOffers后调用makeOffers()。

case ReviveOffers =>  
     makeOffers()
  1. SparkDeploySchedulerBackend调用Executor执行task
    首先通过resourceOffers得到在哪个Executor运行哪个task的信息,然后调用launchTasks向Executor发送task。
// Filter out executors under killing  
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))  
val workOffers = activeExecutors.map { case (id, executorData) =>  
 new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
}.toSeq  
launchTasks(scheduler.resourceOffers(workOffers))  
}

  1. Executor执行task

CoarseGrainedExecutorBackend在接收到LaunchTask后,调用Executor的launchTask运行task。


 case LaunchTask(data) =>  
   if (executor == null) {  
     logError("Received LaunchTask command but executor was null")  
     System.exit(1)  
   } else {  
     val taskDesc = ser.deserialize[TaskDescription](data.value)  
     logInfo("Got assigned task " + taskDesc.taskId)  
     executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,  
       taskDesc.name, taskDesc.serializedTask)  
   }


Executor的内部是一个线程池,每一个提交的task都会包装为TaskRunner交由threadpool执行。

def launchTask(  
 context: ExecutorBackend,  
 taskId: Long,  
 attemptNumber: Int,  
 taskName: String,  
 serializedTask: ByteBuffer): Unit = {  
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,  
 serializedTask)  
runningTasks.put(taskId, tr)  
threadPool.execute(tr)  
}

在TaskRunner中,task.run()真正运行每个task的任务。

   execBackend: ExecutorBackend,  
   val taskId: Long,  
   val attemptNumber: Int,  
   taskName: String,  
   serializedTask: ByteBuffer)  
 extends Runnable {  
 ...... 
 override def run(): Unit = {  
   val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)  
   val deserializeStartTime = System.currentTimeMillis()  
   Thread.currentThread.setContextClassLoader(replClassLoader)  
   val ser = env.closureSerializer.newInstance()  
   logInfo(s"Running $taskName (TID $taskId)")  
   execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)  
   var taskStart: Long = 0  
   startGCTime = computeTotalGcTime()  

   try {  
     val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)  
     updateDependencies(taskFiles, taskJars)  
     task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)  
     task.setTaskMemoryManager(taskMemoryManager)  
     ......
     env.mapOutputTracker.updateEpoch(task.epoch)  

     // Run the actual task and measure its runtime.  
     taskStart = System.currentTimeMillis()  
     val value = try {  
       task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)  
     } finally {  
       ......
     }  
     ......
 }  
}

最终,每个task的运行都会调用iterator()来递归计算RDD。下面是以ShufflerMapTask为例,rdd.iterator(partition, context)会从根partition来计算这个task的输出partition。

private[spark] class ShuffleMapTask(  
 stageId: Int,  
 taskBinary: Broadcast[Array[Byte]],  
 partition: Partition,  
 @transient private var locs: Seq[TaskLocation])  
extends Task[MapStatus](stageId, partition.index) with Logging {  

override def runTask(context: TaskContext): MapStatus = {  
 // Deserialize the RDD using the broadcast variable.  
 val deserializeStartTime = System.currentTimeMillis()  
 val ser = SparkEnv.get.closureSerializer.newInstance()  
 val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
   ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
 _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime  

 metrics = Some(context.taskMetrics)  
 var writer: ShuffleWriter[Any, Any] = null  
 try {  
   val manager = SparkEnv.get.shuffleManager  
   writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
   writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])  
   return writer.stop(success = true).get  
 } catch {  
   case e: Exception =>  
     try {  
       if (writer != null) {  
         writer.stop(success = false)  
       }  
     } catch {  
       case e: Exception =>  
         log.debug("Could not stop writer", e)  
     }  
     throw e  
 }  
}  
}

至此,一个stage的TaskSet的执行流程结束,等此TaskSet中的所有task结束后会回到第三步继续执行下一个stage,直到finalStage结束。:)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容