spark源码分析之任务调度篇

DAG的生成

概述

spark作为一套高效的分布式运算框架,但是想要更深入的学习它,就要通过分析spark的源码,不但可以更好的帮助理解spark的工作过程,还可以提高对集群的排错能力,本文主要关注的是Spark的Stage任务的执行流程的流程。

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

窄依赖 指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
宽依赖 指的是多个子RDD的Partition会依赖同一个父RDD的Partition

DAGScheduler调度队列

当我们看完Executor的创建与启动流程后,我们继续在SparkContext的构造方法中继续查看

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
  。。。。。。
    
 private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //通过SparkEnv来创建createDriverEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  }
  //在这里调用了createSparkEnv,返回一个SparkEnv对象,这个对象里面有很多重要属性,最重要的ActorSystem
  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)

  //创建taskScheduler
  // Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

  //创建DAGScheduler
  dagScheduler = new DAGScheduler(this)

  //启动TaksScheduler
  taskScheduler.start()
    。。。。。
}

在构造方法中还创建了一个DAGScheduler对象,这个类的任务就是用来划分Stage任务的,构造方法中初始化了 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop是一个事件总线对象,用来负责任务的分发,在构造方法eventProcessLoop.start()被调用,该方法是父类EventLoop的start

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

调用了eventThread的start方法,开启了一个线程

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
  }

run方法中不断的从LinkedBlockingDeque阻塞队列中取消息,然后调用onReceive(event)方法,该方法是由子类DAGSchedulerEventProcessLoop实现的

  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      //调用dagScheduler来出来提交任务
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

onReceive中会匹配到传入的任务类型,执行相应的逻辑。到此DAGScheduler的调度队列会一直挂起,不断轮询队列中的任务。

DAG提交Task任务流程

当RDD经过一系列的转换Transformation方法后,最终要执行Action动作方法,这里比如WordCount程序中最后调用collect()方法时会将数据提交到Master上运行,任务真正的被执行,这里的方法执行过程如下

  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

sc 是SparkContext对象,这里调用 一个runJob该方法调用多次重载的方法后,该方法最终会调用 dagScheduler.runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    //dagScheduler出现了,可以切分stage
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

dagScheduler的runJob是我们比较关心的

 def runJob[T, U: ClassTag](
 
    。。。。。

    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

这里面的我们主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任务

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {

     。。。。。。

    //把job加入到任务队列里面
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

这里比较关键的地方是eventProcessLoop.post往任务队列中加入一个JobSubmitted类型的任务,eventProcessLoop是在构造方法中就初始化好的事件总线对象,内部有一个线程不断的轮询队列里的任务

轮询到任务后调用onReceive方法匹配任务类型,在这里我们提交的任务是JobSubmitted类型

    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      //调用dagScheduler来出来提交任务
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

调用了handleJobSubmitted方法,接下来查看该方法

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //最终的stage
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
        。。。。
        submitStage(finalStage)
   }

上面的代码中,调用了newStage进行任务的划分,该方法是划分任务的核心方法,划分任务的根据最后一个依赖关系作为开始,通过递归,将每个宽依赖做为切分Stage的依据,切分Stage的过程是流程中的一环,但在这里不详细阐述,当任务切分完毕后,代码继续执行来到submitStage(finalStage)这里开始进行任务提交
这里以递归的方式进行任务的提交

//递归的方式提交stage
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
            //提交任务
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

调用submitMissingTasks(stage, jobId.get)提交任务,将每一个Stage和jobId传入

  private def submitMissingTasks(stage: Stage, jobId: Int) {
   。。。。。

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      //taskScheduler提交task
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }
  }

这里的代码我们需要关注的是 taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
创建了一个TaskSet对象,将所有任务的信息封装,包括task任务列表,stageId,任务id,分区数参数等

Task任务调度

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
        //创建TaskSetManager保存了taskSet任务列表
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
     //将任务加入调度池
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    //接受任务
    backend.reviveOffers()
  }

该方法比较重要,主要将任务加入调度池,最后调用了backend.reviveOffers()这里的backend是CoarseGrainedSchedulerBackend一个Executor任务调度对象

  override def reviveOffers() {
    //自己给自己发消息
    driverActor ! ReviveOffers
  }

这里用了内部的DriverActor对象发送了一个内部消息给自己,接下来查看receiver方法接受的消息

      case ReviveOffers =>
        makeOffers()

收到消息后调用了makeOffers()方法

    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

makeOffers方法中,将Executor的信息集合与调度池中的Tasks封装成WokerOffers列表传给了
launchTasks

    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
           。。。。。。
        //把task序列化
        val serializedTask = ser.serialize(task)

            。。。。。
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //把序列化好的task发送给Executor
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

launchTasks方法将遍历Tasks集合,每个Task任务序列化,发送启动Task执行消息的给Executor
Executor的onReceive方法

  //DriverActor发送给Executor的启动Task的消息
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val ser = env.closureSerializer.newInstance()
        //把Task反序列化
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        //启动task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

Executor收到DriverActor发送的启动Task的消息,这里才开始真正执行任务了,将收到的Task序列化信息反序列化,调用ExecutorlaunchTask方法执行任务

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    //把task的描述信息放到了一份TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    //然后把TaskRunner丢到线程池里面
    threadPool.execute(tr)
  }

launchTask内将Task提交到线程池去运行,TaskRunner是Runnable对象,里面的run方法执行了我们app生成的每一个RDD的链上的逻辑。 到此,RDD的整个作业方式就结束了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • YarnYarn产生背景:Yarn直接来自于MR1.0MR1.0 问题:采用的是master slave结构,ma...
    时待吾阅读 5,607评论 2 23
  • 上一篇文章讲解了RDD的基本概念, 这篇文章尝试分析当Spark拿到一个RDD之后是如何处理它的. 文中会涉及到S...
    福克斯记阅读 5,978评论 2 17
  • 本文基于spark2.11 1. 前言 1.1 基本概念 RDD关于RDD已经有很多文章了,可以参考一下理解Spa...
    aaron1993阅读 1,802评论 0 3
  • RDD的概述 RDD是只读的、分区记录的集合,是Spark编程模型的最主要抽象,它是一种特殊的集合,支持多种数据源...
    木戎阅读 3,256评论 0 2
  • 成熟的心思吗? 理想中的自己 要面子的童鞋 失去活动的心思 只想自己吗 不想笨拙的参与这个游戏
    明雾心阅读 131评论 0 0