Spark 源码浅析之 Executor 和 Task 部分

Executor 和 Task

Executor 是在 Worker 上启动的为某个 Application 提供专属服务的进程,Task 为 TaskScheduler 提交过来的计算任务。

流程概览

Worker 在收到启动 Executor 请求后,Woker 会通过 ExecutorRunner 启动一个 ExecutorBackend 进程,我这里以 CoarseGrainedExecutorBackend 为例。

CoarseGrainedExecutorBackend 在 SparkCore 的 org.apache.spark.executor 包下。

Executor

Executor 类关系:

类关系

我们先从 CoarseGrainedExecutorBackend 的 main() 方法作为切入点,看看CoarseGrainedExecutorBackend 做了哪些工作:

def main(args: Array[String]) {
  var driverUrl: String = null
  var executorId: String = null
  // ...

  var argv = args.toList
  while (!argv.isEmpty) {
    argv match {
      case ("--driver-url") :: value :: tail =>
        driverUrl = value
        argv = tail
      case ("--executor-id") :: value :: tail =>
        executorId = value
        argv = tail
        
       // ...
    }
  }

  // ...
    
  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
  System.exit(0)
}

run() 方法实现细节:

private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {

  Utils.initDaemon(log)

  SparkHadoopUtil.get.runAsSparkUser { () =>
    
    // ...

    // 创建 ExecutorEnv
    val env = SparkEnv.createExecutorEnv(
      driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

    // 设置通信端 CoarseGrainedExecutorBackend
    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
      env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
    workerUrl.foreach { url =>
      env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopCredentialUpdater()
  }
}

从上面的代码可以看出,CoarseGrainedExecutorBackend 相当于一个监听器,用于接收并处理集群中传递过来的消息。

接下来,我们看一个 CoarseGrainedExecutorBackend 中一个比较重要的成员变量:

var executor: Executor = null

我们再看看 CoarseGrainedExecutorBackend 的 onStart() 方法做了哪些初始化工作:

override def onStart() {
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    driver = Some(ref)
    // 向 Driver 进行反向注册
    // 在 SparkEnv 文章中提到过,这个消息会被 DriverEndpoint 接收
    // 在一切顺利的情况下,DriverEndpint 会给我们返回一个 RegisteredExecutor 消息
    ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // 略略略...
  }(ThreadUtils.sameThread)
}

既然 CoarseGrainedExecutorBackend 相当于一个监听器,那么接下来,我们就需要看看它会处理哪些消息:

override def receive: PartialFunction[Any, Unit] = {
  // 注册成功 Executor
  // 相当于延迟初始化
  case RegisteredExecutor =>
    // 实例化 Executor,真正干活的
    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


  case RegisterExecutorFailed(message) =>
    exitExecutor(1, "Slave registration failed: " + message)

  // 启动任务请求
  case LaunchTask(data) =>
    if (executor == null) {
      exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
      // 解码
      val taskDesc = TaskDescription.decode(data.value)
      // 使用 Executor 启动任务
      executor.launchTask(this, taskDesc)
    }

  case KillTask(taskId, _, interruptThread, reason) =>
    // ...

  case StopExecutor =>
    // ...

  case Shutdown =>
    // ...
    
}

TaskScheduler 通过 SchedulerBackend 将任务发送给 CoarseGrainedExecutorBackend 进行具体的计算,发送的是 LaunchTask 消息,上段代码中可以看到:

// 从上面截取的一部分
case LaunchTask(data) =>
  if (executor == null) {
    exitExecutor(1, "Received LaunchTask command but executor was null")
  } else {
    // 解码
    val taskDesc = TaskDescription.decode(data.value)
    // 使用 Executor 启动任务
    executor.launchTask(this, taskDesc)
  }

任务的具体执行,交给了 Eexcutor 去处理。
我们先看下 Executor 中的一个比较重要的成员变量,稍后在看 Executor.launchTask() 的内部实现细节:

// 执行 Task 的线程池
private val threadPool = {
  val threadFactory = new ThreadFactoryBuilder()
    // 守护线程
    .setDaemon(true)
    .setNameFormat("Executor task launch worker-%d")
    .setThreadFactory(new ThreadFactory {
      override def newThread(r: Runnable): Thread =
        // 略略略
    })
    .build()
  Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

现在我们再看 Executor.launchTask() 的实现细节:

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
  // 创建了一个 TaskRunner
  // TaskRunner 实现了 Runnable 接口,可以直接交给 Task 线程池去执行
  val tr = new TaskRunner(context, taskDescription)
  runningTasks.put(taskDescription.taskId, tr)
  // 执行
  threadPool.execute(tr)
}

接下来我们就需要看看 TaskRunner 的实现细节了。

简单的总结一下,Executor 在启动的时候,会进行反向注册,在收到 Application 提交过来的任务后,会将其丢到线程池中去执行。

Task

在这个版本的 Spark 中,有 ShuffleMapTask 和 ResultTask 两种 Task:

类关系

Executor 会将 TaskRunner 放到线程池中去执行,我们看看 TaskRunner.run() 做了哪些工作:

// 这里只精简了一部分
// 原文大概 200 多行
override def run(): Unit = {

  // 内存管理器
  val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
 
  try {
    // 反序列化的相关配置
    Executor.taskDeserializationProps.set(taskDescription.properties)
    // 下载并加载一些 Task 需要的 Jar 和文件
    updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
    // 反序列化 Task
    task = ser.deserialize[Task[Any]](
      taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
    task.localProperties = taskDescription.properties
    // 为 Task 设置内存管理器
    task.setTaskMemoryManager(taskMemoryManager)


    var threwException = true
    val value = try {
      // 运行 Task
      // 很关键
      // res = result
      val res = task.run(
        taskAttemptId = taskId,
        attemptNumber = taskDescription.attemptNumber,
        metricsSystem = env.metricsSystem)
      threwException = false
      res
    } finally {
      // 释放锁
      val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
      // 释放内存
      val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

      // 略略略
    }
    
    // 将运行结果序列化
    val valueBytes = resultSer.serialize(value)
    
    // 包含累加器等信息的 TaskResult
    val directResult = new DirectTaskResult(valueBytes, accumUpdates)
    // 序列化
    val serializedDirectResult = ser.serialize(directResult)
    // Result 字节大小
    val resultSize = serializedDirectResult.limit
    
    // directSend = sending directly back to the driver
    val serializedResult: ByteBuffer = {
      if (maxResultSize > 0 && resultSize > maxResultSize) {
        ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
      } else if (resultSize > maxDirectResultSize) {
        // 通过 Task ID 创建 Block ID
        val blockId = TaskResultBlockId(taskId)
        // 将运行结果以 Block 的形式 (有Block ID) 
        // 交给 BlockManager 去缓存
        env.blockManager.putBytes(
          blockId,
          new ChunkedByteBuffer(serializedDirectResult.duplicate()),
          StorageLevel.MEMORY_AND_DISK_SER) // 缓存等级
      
        ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
      } else {
        logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
        serializedDirectResult
      }
    }
    
    // 略略略

    // 这里其实会向 DriverEndpoint 发送一条 StatusUpdate 消息
    // DriverEndpoint 会修改其维护的 Executor 的信息,然后唤醒其它的 Executor 去执行
    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    
  } catch {
    // 略略略
  } finally {
    // 将 Task 移出
    runningTasks.remove(taskId)
  }
}

TaskRunner 会先调用 Task.run() 方法去执行任务并获取运行结果。然后,将运行结果交给 BlockManager 去缓存,BlockManager.putBytes() 最终会调用 BlockManager 的 doPutBytes() 方法 【Task.run() 方法稍后再看】:

private def doPutBytes[T](
    blockId: BlockId,
    bytes: ChunkedByteBuffer,
    level: StorageLevel,
    classTag: ClassTag[T],
    tellMaster: Boolean = true,
    keepReadLock: Boolean = false): Boolean = {
  doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>

    // 略略略
    
    val size = bytes.size

    if (level.useMemory) {
      
      // 缓存到内存
      
      val putSucceeded = if (level.deserialized) { // 串行化
        // 反序列化数据流
        val values =
          serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
        // memory put
        memoryStore.putIteratorAsValues(blockId, values, classTag) match {
          case Right(_) => true
          case Left(iter) =>
            // If putting deserialized values in memory failed, we will put the bytes directly to
            // disk, so we don't need this iterator and can close it to free resources earlier.
            iter.close()
            false
        }
      } else {
        // 略略略
      }
      if (!putSucceeded && level.useDisk) {
        // 放入内存失败,并且缓存级别有磁盘
        // 放入磁盘中
        diskStore.putBytes(blockId, bytes)
      }
    } else if (level.useDisk) {
      // 磁盘级别 直接放入磁盘
      diskStore.putBytes(blockId, bytes)
    }
    
    // 数据的 Block 信息
    val putBlockStatus = getCurrentBlockStatus(blockId, info)
    val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
    if (blockWasSuccessfullyStored) {
      // 略略路
      if (tellMaster && info.tellMaster) {
        // 将 Block 块信息发送到 Driver 端
        // BlockManagerMasterEndpoint 收到消息后最终会将 Block 信息放入到 BlockManager 中
        // 具体细节在 BlockManager 文章中再讨论
        reportBlockStatus(blockId, putBlockStatus)
      }
      addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
    }
      
    if (level.replication > 1) {
      // Wait for asynchronous replication to finish
      try {
        ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
      } catch {
        case NonFatal(t) =>
          throw new Exception("Error occurred while waiting for replication to finish", t)
      }
    }
    if (blockWasSuccessfullyStored) {
      None
    } else {
      Some(bytes)
    }
  }.isEmpty
}

接下来,我们再把目光放到 Task.run() 方法上:

final def run(
    taskAttemptId: Long,
    attemptNumber: Int,
    metricsSystem: MetricsSystem): T = {
    
  // 想 Executor 端 BlockManager 注册 Task
  SparkEnv.get.blockManager.registerTask(taskAttemptId)
    
  context = new TaskContextImpl(...)
    
  TaskContext.setTaskContext(context)
  taskThread = Thread.currentThread()

  new CallerContext(...).setCurrentContext()

  try { 
    // 运行任务
    runTask(context)
  } catch {
        // ...
  } finally {
        // ...
    } finally {
        // ...
      }
    }
  }
}

由于有两种 Task,所以我们需要分开来看,先看 ShuffleMapTask.runTask():

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val threadMXBean = ManagementFactory.getThreadMXBean
  
  // ...
  
  val ser = SparkEnv.get.closureSerializer.newInstance()
  // 反序列化
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  var writer: ShuffleWriter[Any, Any] = null
  try {
    // 获取 ShuffleManager,默认为 SortShuffleManager
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    // 计算并写出
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
    // ...
  }
}

我们看下 RDD.iterator():

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    // 获取缓存或者计算
    // 第一次肯定没有缓存,所以需要计算
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

接下来,我们再看下 ResultTask.runTask() 方法:

override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val threadMXBean = ManagementFactory.getThreadMXBean

  // ...
    
  val ser = SparkEnv.get.closureSerializer.newInstance()
    
 // 反序列化
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L
    
  // 执行业务逻辑代码,我们编写的
  func(context, rdd.iterator(partition, context))
}

ResultTask 会直接产生运算的结果。

简单的总结一下,TaskRunner 会先反序列化 Task,然后调用其 run() 方法进行运算,ShuffleMapTask 会通过 ShuffleWriter 将中间结果写出,ResultTask 会直接产生运行结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容