【Spark Core】任务执行机制和Task源码浅析2

引言

上一小节《任务执行机制和Task源码浅析1》介绍了Executor的注册过程。
这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍。

1. Executor的launchTasks函数

DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数。
下面是CoarseGrainedExecutorBackend中receiveWithLogging的部分代码:

    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

Executor执行task:

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

Executor内部维护一个线程池,可以跑多个task,每一个提交的task都会包装成TaskRunner交由threadPool执行。

2. TaskRunner的run方法

run方法中val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)是真正执行task中的任务。

下面是TaskRunner中run方法的部分代码:

      try {
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        // 反序列化Task
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        if (killed) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException
        }

        attemptedTask = Some(task)
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // Run the actual task and measure its runtime.
        // 运行Task, 具体可以去看ResultTask和ShuffleMapTask
        taskStart = System.currentTimeMillis()
        val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
        val taskFinish = System.currentTimeMillis()

        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw new TaskKilledException
        }

        // 对结果进行序列化
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()

        // 更新任务的相关监控信息,会反映到监控页面上的
        for (m <- task.metrics) {
          m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
          m.setExecutorRunTime(taskFinish - taskStart)
          m.setJvmGCTime(gcTime - startGCTime)
          m.setResultSerializationTime(afterSerialization - beforeSerialization)
        }

        val accumUpdates = Accumulators.values
        // 对结果进行再包装,包装完再进行序列化
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit

        // directSend = sending directly back to the driver
        val serializedResult = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            // 如果中间结果的大小超过了spark.akka.frameSize(默认是10M)的大小,就要提升序列化级别了,超过内存的部分要保存到硬盘的
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
        // 将任务完成和taskresult,通过statusUpdate报告给driver
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

      } catch {
        //异常处理代码,略去...
      } finally {
        // 清理为ResultTask注册的shuffle内存,最后把task从正在运行的列表当中删除
        // Release memory used by this thread for shuffles
        env.shuffleMemoryManager.releaseMemoryForThisThread()
        // Release memory used by this thread for unrolling blocks
        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
        // Release memory used by this thread for accumulators
        Accumulators.clear()
        runningTasks.remove(taskId)
      }
    }

3. Task执行过程

TaskRunner会启动一个新的线程,我们看一下run方法中的调用过程:
TaskRunner.run --> Task.run --> Task.runTask --> RDD.iterator --> RDD.computeOrReadCheckpoint --> RDD.compute

Task的run函数代码:

  /**
   * Called by Executor to run this task.
   *
   * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
   * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
   * @return the result of the task
   */
  final def run(taskAttemptId: Long, attemptNumber: Int): T = {
    context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
      taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
    TaskContextHelper.setTaskContext(context)
    context.taskMetrics.setHostname(Utils.localHostName())
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {
      runTask(context)
    } finally {
      context.markTaskCompleted()
      TaskContextHelper.unset()
    }
  }

ShuffleMapTask和ResultTask分别实现了不同的runTask函数。

ShuffleMapTask的runTask函数代码:

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的  

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      // 将rdd计算的结果写入memory或者disk  
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

ResultTask的runTask函数代码:

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

4. Task状态更新

Task执行是通过TaskRunner来运行,它需要通过ExecutorBackend和Driver通信,通信消息是StatusUpdate:

  1. Task运行之前,告诉Driver当前Task的状态为TaskState.RUNNING。
  1. Task运行之后,告诉Driver当前Task的状态为TaskState.FINISHED,并返回计算结果。
  2. 如果Task运行过程中发生错误,告诉Driver当前Task的状态为TaskState.FAILED,并返回错误原因。
  3. 如果Task在中途被Kill掉了,告诉Driver当前Task的状态为TaskState.FAILED。


5. Task执行完毕

Task执行完毕,在TaskRunner的run函数中,通过statusUpdate通知ExecuteBackend,结果保存在DirectTaskResult中。
SchedulerBackend接收到StatusUpdate之后做如下判断:如果任务已经成功处理,则将其从监视列表中删除。如果整个作业中的所有任务都已经完成,则将占用的资源释放。
TaskSchedulerImpl将当前顺利完成的任务放入完成队列,同时取出下一个等待运行的Task。

下面CoarseGrainedSchedulerBackend是中处理StatusUpdate消息的代码:

      case StatusUpdate(executorId, taskId, state, data) =>
        //statusUpdate函数处理处理从taskset删除已完成的task等工作
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                "from unknown executor $sender with ID $executorId")
          }
        }

scheduler.statusUpdate函数进行如下步骤:

  1. TaskScheduler通过TaskId找到管理这个Task的TaskSetManager(负责管理一批Task的类),从TaskSetManager里面删掉这个Task,并把Task插入到TaskResultGetter(负责获取Task结果的类)的成功队列里;
  1. TaskResultGetter获取到结果之后,调用TaskScheduler的handleSuccessfulTask方法把结果返回;
  2. TaskScheduler调用TaskSetManager的handleSuccessfulTask方法,处理成功的Task;
  3. TaskSetManager调用DAGScheduler的taskEnded方法,告诉DAGScheduler这个Task运行结束了,如果这个时候Task全部成功了,就会结束TaskSetManager。

DAGScheduler在taskEnded方法里触发CompletionEvent事件,在处理CompletionEvent消息事件中调用DAGScheduler的handleTaskCompletion函数,针对ResultTask和ShuffleMapTask区别对待结果:
1)ResultTask:
job的numFinished加1,如果numFinished等于它的分片数,则表示任务该Stage结束,标记这个Stage为结束,最后调用JobListener(具体实现在JobWaiter)的taskSucceeded方法,把结果交给resultHandler(经过包装的自己写的那个匿名函数)处理,如果完成的Task数量等于总任务数,任务退出。
2)ShuffleMapTask:

  1. 调用Stage的addOutputLoc方法,把结果添加到Stage的outputLocs列表里
  1. 如果该Stage没有等待的Task了,就标记该Stage为结束
  2. 把Stage的outputLocs注册到MapOutputTracker里面,留个下一个Stage用
  3. 如果Stage的outputLocs为空,表示它的计算失败,重新提交Stage
  4. 找出下一个在等待并且没有父亲的Stage提交

转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容