本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。
在spark源码阅读之executor模块①中,我们创建了DriverEndpoint并说明它会周期性的通过给自己发送ReviveOffers消息而去调用makeOffers()方法,从而实现为executor分配资源并加载Tasks。
在spark源码阅读之executor模块②中,我们分析了application的提交、driver的初始化以及在workers上分配了executors,最后也是调用了DriverEndpoint的makeOffers()方法给executors分配Tasks。
所以这一章,我们就以DriverEndpoint的makeOffers()方法为起点来分析:如何给executors加载Tasks,如何执行这些Tasks。
分配Tasks
下面就来看一下makeOffers方法的源码
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
// 选出还活着的executors,然后准备分配资源
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers)) //先调用scheduler的resourceOffers方法分配资源,然后再launchTasks
}
代码中的scheduler是TaskSchedulerImpl的实例,resourceOffers方法的作用是给executors分配封装好的TaskSet,这其中的工作主要由TaskSchedulerImpl和DAGScheduler来完成,这部分属于调度模块的内容,准备放到调度模块再分析。
在分配好Task之后,调用launchTasks方法来加载Tasks
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask: ByteBuffer = ser.serialize(task) //序列化task
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData: ExecutorData = executorDataMap(task.executorId) //取出封装executor信息的executorData
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) //向executor的rpcEndpoint发送LaunchTask的消息
}
}
}
这里有个限制,序列化之后的Task的大小,如果大于spark.akka.frameSize (默认128M) - reservedSizeBytes(固定200KB),会报错,提示可以调整spark.akka.frameSize大小或者采用广播的方法来传递大消息体。
如果是满足要求的Task,将序列化之后的Task数据包装后,拿出对应的ExecutorData实例,发送LaunchTask的消息,给其executorEndpoint。
CoarseGrainedExecutorBackend在收到LaunchTask请求后的代码如下:
case LaunchTask(data) => //executor收到了LaunchTask的请求
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value) //将收到的task反序列化
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, //加载task
taskDesc.name, taskDesc.serializedTask)
}
首先将收到的Task反序列化,然后调用executor的launchTask方法加载Task,launchTask方法如下:
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask) //将task的信息封装成TaskRunner
runningTasks.put(taskId, tr)
threadPool.execute(tr) //将TaskRunner放到threadPool中去执行
}
在此方法中,将Task数据封装成一个TaskRunner,然后放到线程池中去执行,可见实际执行Task的过程体现在TaskRunner的run()方法中,TaskRunner的run方法分为三个部分:
- 反序列化并下载Task的依赖
- 执行Task
- 处理返回的结果
Task的执行
第一部分没什么好说的,我们一起来看看第二部分,以下是run方法的截选:
override def run(): Unit = {
......省略不重要代码
// Run the actual task and measure its runtime.
// 开始执行Task且确定它的运行时间
taskStart = System.currentTimeMillis()
var threwException = true
val (value, accumUpdates) = try {
val res = task.run( //最终调用Task的run方法来执行Task
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
......
task是反序列化得到的Task实例,调用其run方法来执行Task
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
stageId,
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
runningLocally = false) //首先构造一个TaskContext,然后将参数配置到这个TaskContext中
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
(runTask(context), context.collectAccumulators()) //运行Task
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
}
throw e
} finally {
// Call the task completion callbacks.
// Task结束时的回调函数
context.markTaskCompleted()
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
// not be strictly necessary, we should revisit whether we can remove this in the future.
val memoryManager = SparkEnv.get.memoryManager
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
TaskContext.unset()
}
}
}
以上run方法中,首先创建了一个TaskContext保留Task的上下文,然后调用runTask方法来运行这个Task,最后结束时调用回调函数markTaskCompleted,来完成最终结果的处理。
runTask方法在不同类型的Task中会有不同的实现,主要分析ShuffleMapTask和ResultTask
针对最后一个stage生成的Task就叫做ResultTask,ResultTask会将最终计算结果汇报道driver端,具体实现如下:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance() //获取用于反序列化的实例
// 获取RDD和作用于RDD结果的函数
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics) //Task的metrics信息
// 调用rdd.iterator执行rdd上的计算
func(context, rdd.iterator(partition, context))
}
ShuffleMapTask计算得到的是中间结果,且需要通过shuffle策略来生成中间文件落地之后供下游Task来fetch,具体实现放在shuffle模块详解,这里展示其runTask方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
// 反序列化广播变量taskBinary得到RDD
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
// 获得shuffle manager
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 获得shuffle writer
// 首先调用RDD的iterator,如果这个RDD已经cache或者checkpoint了,直接读取结果,否则就开始计算
// 计算结果调用shuffle write写入本地文件系统
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 返回数据的元数据信息,包括location和size
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
Task计算结果处理
接下来我们再来看第三部分,处理返回的结果,以下截选,Executor的run方法中的其中一段
val resultSer = env.serializer.newInstance() //获取序列化工具
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value) //序列化结果
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
m.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
m.updateAccumulators()
}
// 首先将结果放入到DirectTaskResult
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult) //序列化结果
val resultSize = serializedDirectResult.limit //序列化结果的大小
// directSend = sending directly back to the driver
// 将结果回传给driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) { // 如果序列化结果大于maxResultSize,直接抛弃,默认1GB
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //“较大”的结果:akkaFrameSize默认128M
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes( // 结果存入blockManager
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult // 如果结果不大,直接回传给driver
}
}
// execBackend是ExecutorBackend的一个实例,实际上是Executor与Driver通信的接口
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
这里回传给Driver时结果大小的限制与上文中所述限制一样,这段代码最后将结果发回Driver端,Driver端在收到statusUpdate消息之后,会调用TaskSchedulerImpl的statusUpdate方法来处理收到的Task运行结果,并将完成的资源通过makeOffers方法释放出来,具体实现如下:
case StatusUpdate(executorId, taskId, state, data) => // 收到Task的运行结果
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
在TaskSchedulerImpl的statusUpdate方法中,如果返回的Task状态是FINISHED,就调用enqueueSuccessfulTask方法来处理结果,如果返回的状态有异常(FAILED、KILLED、LOST),那么会调用enqueueFailedTask方法来处理结果,逻辑如下(截选statusUpdate)
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
// FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
if (TaskState.isFinished(state)) { //如果这个任务是已经结束了的状态
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1 //清理一些缓存
}
}
}
if (state == TaskState.FINISHED) { // 如果返回的状态值是FINISHED,说明是正常结束了
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //调用enqueueSuccessfulTask进行处理
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { //如果是其他的状态,说明是任务失败了
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
.format(state, tid))
}
在enqueueSuccessfulTask方法中处理的Task结果分为两种:DirectTaskResult和IndirectTaskResult,IndirectTaskResult需要通过blockId到BlockManager中获取结果,结果数据反序列化之后,调用TaskSchedulerImpl的handleSuccessfulTask方法来处理结果,具体实现如下:
def enqueueSuccessfulTask( //处理得到的计算结果
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] => //如果是directResult
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
// deserialize "value" without holding any lock so that it won't block other threads.
// We should call it here, so that when it's called again in
// "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
directResult.value()
(directResult, serializedData.limit())
case IndirectTaskResult(blockId, size) => //如果是IndirectTaskResult,那么需要通过blickId到BlockManager中去获取结果:“较
if (!taskSetManager.canFetchMoreResults(size)) {
// dropped by executor if size is larger than maxResultSize
sparkEnv.blockManager.master.removeBlock(blockId)
return
}
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
if (!serializedTaskResult.isDefined) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
scheduler.handleFailedTask(
taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result) //调用handleSuccessfulTask来处理结果
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
// Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
case NonFatal(ex) =>
logError("Exception while getting task result", ex)
taskSetManager.abort("Exception while getting task result: %s".format(ex))
}
}
})
}
TaskSchedulerImpl的handleSuccessfulTask方法中首先标记Task已经完成,然后会调用DAGScheduler的taskEnded方法处理结果,DAGScheduler中有一个自监测的DAGSchedulerEventProcessLoop实例,通过这个实例发送CompletionEvent的消息,最后调用DAGScheduler的handleTaskCompletion方法,具体实现如下:
/**
* Called by the TaskSetManager to report task completions or failures.
* TaskSetManager向DAGScheduler报告tasks是否成功处理
*/
def taskEnded(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics): Unit = {
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}
eventProcessLoop其实就是DAGSchedulerEventProcessLoop的实例,用来监测DAGScheduler的自身状态,在其doOnReceive中通过模式匹配的方法对各种消息调用不同的方法来处理:
private def doOnReceive(event: DAGSchedulerEvent): Uni
//如果提交的是一个JobSubmitted的Event,那么调用handleJobSubmitte
case JobSubmitted(jobId, rdd, func, partitions, call
dagScheduler.handleJobSubmitted(jobId, rdd, func,
case MapStageSubmitted(jobId, dependency, callSite,
dagScheduler.handleMapStageSubmitted(jobId, depend
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFaile
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason,
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
可见CompletionEvent消息调用了DAGScheduler的handleTaskCompletion方法,在这个方法中,针对两种Task(ResultTask和ShuffleMapTask)有不同的处理逻辑,如果是ResultTask,则job标记结束;如果是ShuffleMapTask则说明不是最后一个stage,调用MapOutputTrackerMaster将Task的运行结果注册到mapOutput,接下来下一个stage的Task就会通过shuffle read去读取这部分数据,这部分内容将在shuffle模块中说明,以下截选DAGScheduler的handleTaskCompletion方法:
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
case smt: ShuffleMapTask => // 如果是ShuffleMapTask发来的Task Result
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
shuffleStage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
// epoch incremented to refetch them.
// TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs( // 将Task的运行结果注册到mapOutputTracker
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
clearCacheLocs()
if (!shuffleStage.isAvailable) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
// Mark any map-stage jobs waiting on this stage as finished
if (shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
}
至此,Task运行结束,executor模块的源码阅读也告一段落,回顾executor模块的三篇文章,我们从SparkContext这个交互接口入手,详细描述了application是如何注册的,driver是如何生成的,driver和executor如何分配计算资源,分配完成之后又是怎样启动executor的,接着分析了Task是怎么分配给executor的,又是如何计算的,计算结果又做何处理。总结成一句话:executor负责Task的计算,并将计算结果回传给Driver。