Spark源码之TaskScheduler

Spark源码之TaskScheduler介绍篇

前面DAGScheduler将stage划分好之后,又将生成的TaskSet提交给TaskScheduler,那么本章节就要叙述下TaskScheduler如何启动Task的;

TaskScheduler任务源码分析

DAGScheduler将TaskSet提交给TaskScheduler,那么就先看下submitTasks(),打开TaskScheduler的实现类TaskSchedulerImpl,在这个方法里面,先生成了一个TaskManager对象来封装taskSet,然后判断当前stage中是否只正常运行一个taskSet,以及taskManager是否是僵尸进程;随后将生成的TaskManager放入到schedulableBuilder调度策略中,做完以上工作后开始想backend申请资源backend.reviveOffers();

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //创建一个TaskManager maxTaskFailures:最大失败重试次数 默认4次
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      // 在这里判断当前stage的是否有两个taskSet在运行,因为同一个stage中只能运行一个taskSet
      // 一方面判断当前的TaskSet是否已经在运行了,
      // 另一方面判断当前的taskSetManager是否是僵尸进程
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //将TaskSetManager添加到调度策略中(FIFOSchedulableBuilder/FIARSchedulableBuilder)
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //在这里开始向backend的发送资源申请的请求,其实是向DriverEndPoint发送的请求
    backend.reviveOffers()
  }

我们进入CoarseGrainedSchedulerBackend的reviveOffers方法,可以看到在方法里面driverEndpoint向自己发送了一个ReviveOffers消息,而这个driverEndpoint我们前面也讲过,就是当前应用程序的Driver;

  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

进入driverEndpointreviveOffers方法,最终调用的是makeOffers()方法,在这个方法里面先过滤出状态为alive的executor,然后将这些activeExecutor封装成WorkerOffer对象,关键点是在最后的lanchTasks方法,我们先看下scheduler.resourceOffers(workOffers)这个方法的作用;

   case ReviveOffers =>
     makeOffers()


    // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

我们再回到TaskSchedulerImpl,查看resourceOffers方法:
TaskSchedulerImpl.resourceOffers:为每一个Task具体分配计算资源,输入时ExecutorBackend以及可用的Cores,输出是 TaskDescription的二维数组,在其中确定了每个task具体运行在哪个ExecutorBackend;
在方法内部先将可用的executors添加到数据结构中,然后在将可用的executors进行shuffle以便做到负载均衡,为每个executor创建一个task数组用于存放TaskDescription,最后遍历调度策略中的TaskSet,使用就近原则为task分配executor,在这里需要腔调一点的是DAGScheduler.submitMissingTasks()方法中我们是获取了每个task的对应数据的位置,而在本方法中的taskSet.myLocalityLevels)是为了获取Task对应数据位置的级别,如下代码所示:

  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    //todo 标记是否有新的executor
    var newExecAvail = false
    //todo 遍历每个executor
    for (o <- offers) {
      //todo 向数据结构中添加executor信息
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    //todo 将可用的executor进行shuffle打乱,以便做到负载均衡
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    //todo 为每个executor构建一个task数组
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //todo 所有executor可用的core资源
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //todo 从调度池中获取TaskSetManagers
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    //todo 我们在DAGScheduler.submitMissingTasks()中已经获取了每个Task中数据所在的位置,
    //todo 这是的taskSet.myLocalityLevels只是根据Task数据所在的host来获取它的的数据本地
    //todo 性级别(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)
    var launchedTask = false
    //todo  对每一个taskSet,按照就近顺序分配最近的executor来执行task
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        //todo 将前面随机打散的WorkOffers计算资源按照就近原则分配给taskSet,用于执行其中的task
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

接着进入resourceOfferSingleTaskSet方法,遍历所有的executor的索引地址,以便作为tasks的索引,将每个task分配给相应的executor,并填充tasks;
而这个tasks数据结构是调用resourceOfferSingleTaskSet的方法里传进来的,它存储着每个executor内的tasks信息,详细信息见下图源码:

  private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //todo 遍历所有的executor的索引
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //todo 判断该executor可以用的资源是否>=CPUS_PER_TASK(默认为1)
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          //todo 为taskSet中的task分配executor,并将信息存储在tasks中,注意这个tasks是从
          //todo 上面的方法传进来的
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            //todo 将每个task信息写入下面的数据结构中
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToTaskCount(execId) += 1
            executorsByHost(host) += execId
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

再到DriverEndPoint的makeOffers方法中,scheduler.resourceOffers(workOffers)已经执行完毕,taskSet已经分配完毕,接着执行launchTasks()方法,该方法遍历每个task,并向每个task所对应的executor发送launchTask消息;如下代码所示:

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      //todo 遍历每个task
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        //todo 检查序列化后的task大小
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //todo 获取task对应的executor
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //todo 向该executor发送launchTask请求
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

我们继续追踪,进入CoarseGrainedExecutorBackend的LaunchTask的LaunchTask方法,该方法又调用了executor.launchTask

    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

我们进入Executor的launchTask方法,在该方法内实例化除了TaskRunner对象,而TaskRunner对象是一个线程,通过线程池threadPool来运行;

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

总结:

DAGScheduler划分好stage,并将生成的TaskSet提交给TaskScheduler,TaskScheduler向Driver请求分配资源,Driver将可用的ExecutorBackend资源发给TaskScheduler,在TaskScheduler中将Task分配给ExecutorBackend,最后向ExecutorBackend发送launchTask请求,在ExecutorBackend中调executor对象的launchTask,在Executor对象的launchTask方法中启动TaskRunner线程并用线程池去执行TaskRunner线程;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容