Executor 和 Task
Executor 是在 Worker 上启动的为某个 Application 提供专属服务的进程,Task 为 TaskScheduler 提交过来的计算任务。
Worker 在收到启动 Executor 请求后,Woker 会通过 ExecutorRunner 启动一个 ExecutorBackend 进程,我这里以 CoarseGrainedExecutorBackend 为例。
CoarseGrainedExecutorBackend 在 SparkCore 的 org.apache.spark.executor 包下。
Executor
Executor 类关系:
我们先从 CoarseGrainedExecutorBackend 的 main() 方法作为切入点,看看CoarseGrainedExecutorBackend 做了哪些工作:
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
// ...
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
// ...
}
}
// ...
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
run() 方法实现细节:
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
Utils.initDaemon(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
// ...
// 创建 ExecutorEnv
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 设置通信端 CoarseGrainedExecutorBackend
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopCredentialUpdater()
}
}
从上面的代码可以看出,CoarseGrainedExecutorBackend 相当于一个监听器,用于接收并处理集群中传递过来的消息。
接下来,我们看一个 CoarseGrainedExecutorBackend 中一个比较重要的成员变量:
var executor: Executor = null
我们再看看 CoarseGrainedExecutorBackend 的 onStart() 方法做了哪些初始化工作:
override def onStart() {
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
driver = Some(ref)
// 向 Driver 进行反向注册
// 在 SparkEnv 文章中提到过,这个消息会被 DriverEndpoint 接收
// 在一切顺利的情况下,DriverEndpint 会给我们返回一个 RegisteredExecutor 消息
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// 略略略...
}(ThreadUtils.sameThread)
}
既然 CoarseGrainedExecutorBackend 相当于一个监听器,那么接下来,我们就需要看看它会处理哪些消息:
override def receive: PartialFunction[Any, Unit] = {
// 注册成功 Executor
// 相当于延迟初始化
case RegisteredExecutor =>
// 实例化 Executor,真正干活的
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)
// 启动任务请求
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 解码
val taskDesc = TaskDescription.decode(data.value)
// 使用 Executor 启动任务
executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread, reason) =>
// ...
case StopExecutor =>
// ...
case Shutdown =>
// ...
}
TaskScheduler 通过 SchedulerBackend 将任务发送给 CoarseGrainedExecutorBackend 进行具体的计算,发送的是 LaunchTask 消息,上段代码中可以看到:
// 从上面截取的一部分
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 解码
val taskDesc = TaskDescription.decode(data.value)
// 使用 Executor 启动任务
executor.launchTask(this, taskDesc)
}
任务的具体执行,交给了 Eexcutor 去处理。
我们先看下 Executor 中的一个比较重要的成员变量,稍后在看 Executor.launchTask() 的内部实现细节:
// 执行 Task 的线程池
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
// 守护线程
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
override def newThread(r: Runnable): Thread =
// 略略略
})
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
现在我们再看 Executor.launchTask() 的实现细节:
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
// 创建了一个 TaskRunner
// TaskRunner 实现了 Runnable 接口,可以直接交给 Task 线程池去执行
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
// 执行
threadPool.execute(tr)
}
接下来我们就需要看看 TaskRunner 的实现细节了。
简单的总结一下,Executor 在启动的时候,会进行反向注册,在收到 Application 提交过来的任务后,会将其丢到线程池中去执行。
Task
在这个版本的 Spark 中,有 ShuffleMapTask 和 ResultTask 两种 Task:
Executor 会将 TaskRunner 放到线程池中去执行,我们看看 TaskRunner.run() 做了哪些工作:
// 这里只精简了一部分
// 原文大概 200 多行
override def run(): Unit = {
// 内存管理器
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
try {
// 反序列化的相关配置
Executor.taskDeserializationProps.set(taskDescription.properties)
// 下载并加载一些 Task 需要的 Jar 和文件
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
// 反序列化 Task
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
// 为 Task 设置内存管理器
task.setTaskMemoryManager(taskMemoryManager)
var threwException = true
val value = try {
// 运行 Task
// 很关键
// res = result
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
// 释放锁
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
// 释放内存
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
// 略略略
}
// 将运行结果序列化
val valueBytes = resultSer.serialize(value)
// 包含累加器等信息的 TaskResult
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
// 序列化
val serializedDirectResult = ser.serialize(directResult)
// Result 字节大小
val resultSize = serializedDirectResult.limit
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
// 通过 Task ID 创建 Block ID
val blockId = TaskResultBlockId(taskId)
// 将运行结果以 Block 的形式 (有Block ID)
// 交给 BlockManager 去缓存
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER) // 缓存等级
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
// 略略略
// 这里其实会向 DriverEndpoint 发送一条 StatusUpdate 消息
// DriverEndpoint 会修改其维护的 Executor 的信息,然后唤醒其它的 Executor 去执行
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
// 略略略
} finally {
// 将 Task 移出
runningTasks.remove(taskId)
}
}
TaskRunner 会先调用 Task.run() 方法去执行任务并获取运行结果。然后,将运行结果交给 BlockManager 去缓存,BlockManager.putBytes() 最终会调用 BlockManager 的 doPutBytes() 方法 【Task.run() 方法稍后再看】:
private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
// 略略略
val size = bytes.size
if (level.useMemory) {
// 缓存到内存
val putSucceeded = if (level.deserialized) { // 串行化
// 反序列化数据流
val values =
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
// memory put
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
// disk, so we don't need this iterator and can close it to free resources earlier.
iter.close()
false
}
} else {
// 略略略
}
if (!putSucceeded && level.useDisk) {
// 放入内存失败,并且缓存级别有磁盘
// 放入磁盘中
diskStore.putBytes(blockId, bytes)
}
} else if (level.useDisk) {
// 磁盘级别 直接放入磁盘
diskStore.putBytes(blockId, bytes)
}
// 数据的 Block 信息
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// 略略路
if (tellMaster && info.tellMaster) {
// 将 Block 块信息发送到 Driver 端
// BlockManagerMasterEndpoint 收到消息后最终会将 Block 信息放入到 BlockManager 中
// 具体细节在 BlockManager 文章中再讨论
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(bytes)
}
}.isEmpty
}
接下来,我们再把目光放到 Task.run() 方法上:
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
// 想 Executor 端 BlockManager 注册 Task
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(...)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()
new CallerContext(...).setCurrentContext()
try {
// 运行任务
runTask(context)
} catch {
// ...
} finally {
// ...
} finally {
// ...
}
}
}
}
由于有两种 Task,所以我们需要分开来看,先看 ShuffleMapTask.runTask():
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
// ...
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
// 获取 ShuffleManager,默认为 SortShuffleManager
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 计算并写出
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
// ...
}
}
我们看下 RDD.iterator():
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
// 获取缓存或者计算
// 第一次肯定没有缓存,所以需要计算
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
接下来,我们再看下 ResultTask.runTask() 方法:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
// ...
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
// 执行业务逻辑代码,我们编写的
func(context, rdd.iterator(partition, context))
}
ResultTask 会直接产生运算的结果。
简单的总结一下,TaskRunner 会先反序列化 Task,然后调用其 run() 方法进行运算,ShuffleMapTask 会通过 ShuffleWriter 将中间结果写出,ResultTask 会直接产生运行结果。