TaskScheduler
TaskScheduler 负责对 DAGScheduler 提交过来的 Task 与最佳位置的 Executor 进行绑定,然后通过 SchedulerBackend 发送到 Executor 上去执行。
在这个版本中,TaskScheduler 只有一个实现类,就是 TaskSchedulerImpl,在 Spark-Core 的 org.apache.spark.scheduler 包下。
源码
在 SparkContext 中对 TaskScheduler 进行了初始化操作,在 SparkContext 概览中提到过:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_taskScheduler = ts
// 对 TaskScheduler 进行了启动
_taskScheduler.start()
SparkContext.createTaskScheduler() 会根据运行模式的不同,创建不同类型的 SchedulerBackend,我这里以 Standalone 模式为例:
private def createTaskScheduler(...) = {
import SparkMasterRegex._
master match {
// Standalone 模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
// ...
}
}
我们先看下 TaskScheduler.initialize() 方法:
def initialize(backend: SchedulerBackend) {
this.backend = backend
// 根据 conf 中的设置,选择不同模式的任务调度器
// 通过设定 SCHEDULER_MODE_PROPERTY 这个值来更改
// 默认为 FIFO
schedulableBuilder = {
schedulingMode match {
// 先进先出
case SchedulingMode.FIFO =>
// RootPool => TaskSetManager 的调度池(一个队列)
new FIFOSchedulableBuilder(rootPool)
// 公平
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
// 创建树形节点
schedulableBuilder.buildPools()
}
在 DAGScheduler 划分完 Stage 后,会将其封装成 TaskSet 并提交给 TaskScheduler.submitTasks() 来做进一步的工作,我们先看下它的实现细节:
override def submitTasks(taskSet: TaskSet) {
// 取出 TaskSet 中的 Task
val tasks = taskSet.tasks
this.synchronized {
// 为每个 TaskSet 创建一个 TaskSetManager
// TaskSetManager 负责任务失败时的重试工作
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 将 manager 添加到 schedulableBuilder 中
// schedulableBuilder 负责 TaskSetManager 的调度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 其它操作
hasReceivedTask = true
}
// 资源的分配
backend.reviveOffers()
}
我们先看下 SchedulableBuilder.addTaskSetManager() 的实现细节,以 FIFO 模式为例:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager)
}
rootPool.addSchedulable() 的实现细节:
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
// 将 TaskSetManager 放入到 schedulable 队列中
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
从代码中可以看出,当 DAGScheduler 提交完 TaskSet 后,就会为其创建一个 TaskSetManager,然后将 TaskSetManager 放入到 TaskSetManager 队列(池)中去等待执行。
接下来我们看看 backend.reviveOffers() 的实现细节,看看 TaskSetManager 是如何被调用的:
override def reviveOffers() {
// 发送一个 ReviveOffers 消息
// DriverEndpoint 上文提到过
driverEndpoint.send(ReviveOffers)
}
DriverEndpoint.receive() 会对这个消息进行处理 (CoarseGrainedSchedulerBackend 的内部类):
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
makeOffers()
// 略略略
}
makeOffers() 的实现细节:
private def makeOffers() {
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// executorDataMap 为 executor 字典
// 上文提到过,将反注册过来的 Executor 都放到了这里
// 找出活跃的 Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 封装 Executor 可用的资源量和联系方式
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
// 将可用的 Executor 资源信息发送给 TaskScheduler.resourceOffers()
// TaskScheduler 会按最优的条件将 Task 与 Executor 进行绑定并返回其集合
scheduler.resourceOffers(workOffers)
}
// 将与 Executor 绑定完的 Task 交给 Eexecutor 去执行
if (!taskDescs.isEmpty) {
// 在每个 Executor 上启动分别启动其对应的 Task
launchTasks(taskDescs)
}
TaskScheduler.resourceOffers() 的实现细节:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 要执行的任务集
// TaskDescription 中有 Task ID 和 Executor ID
// 下面的代码会将 Task 与 Executor 进行绑定,确定任务要到哪个 Executor 上去执行
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// 获取排序后的等待执行的 TaskSetManager
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
if (newExecAvail) {
// 计算每个 Task 执行的位置(多个)
// 有兴趣的可以点进去看看
taskSet.executorAdded()
}
}
// 根据优先级(本地、机架...) 将每个 Task 与 Executor 进行绑定
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
// 将与 Executor 绑定完的 Task 集合返回给 SchedulerBackend
// SchedulerBackend 接下来会提交给 Executor 去执行
return tasks
}
SchedulerBackend.launchTasks() 的实现细节:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
// 资源不够用
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
// 其它操作
}
}
else {
// 其它操作
// 向 Executor 发送了一个启动任务的请求
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
// Executor 收到请求后
// 先简单看一下
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
// 启动任务
executor.launchTask(this, taskDesc)
}
总的来说,TaskScheduler 负责着任务的调度、唤醒与重启的工作。