任务执行-源码分析

CoarseGrainedExecutorBackend.receive
收到LaunchTask消息

override def receive: PartialFunction[Any, Unit] = {
   case RegisteredExecutor =>
     logInfo("Successfully registered with driver")
     try {
       executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
         resources = _resources)
       driver.get.send(LaunchedExecutor(executorId))
     } catch {
       case NonFatal(e) =>
         exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
     }

   case LaunchTask(data) =>
     if (executor == null) {
       exitExecutor(1, "Received LaunchTask command but executor was null")
     } else {
       //反序列化task对象
       val taskDesc = TaskDescription.decode(data.value)
       logInfo("Got assigned task " + taskDesc.taskId)
       taskResources(taskDesc.taskId) = taskDesc.resources
      //计算对象运行task
       executor.launchTask(this, taskDesc)
     }

使用executor的线程池threadPool执行task

 def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    val tr = new TaskRunner(context, taskDescription, plugins)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
    if (decommissioned) {
      log.error(s"Launching a task while in decommissioned state.")
    }
  }

TaskRunner.run

//任务运行
   val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem,
            resources = taskDescription.resources,
            plugins = plugins)

Task.run

runTask(context)

计算对象运行,计算逻辑在每个任务中

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容