TaskScheduler负责不同的Application的Jobs之间的调度,在Task执行失败时启动重试机制,并为执行缓慢的Task启动备份任务
SchedulerBackend负责与Cluster Manager交互,取得分配给Application的资源,并将资源传给TaskScheduler,由TaskScheduler为Task最终分配计算资源
TaskScheduler的创建
TaskScheduler通过org.apache.spark.SparkContext#createTaskScheduler创建。
// Create and start the scheduler
val(sched,ts) = SparkContext.createTaskScheduler(this,master)
def master:String = _conf.get("spark.master")
SparkContext.createTaskScheduler根据传入Master的URL的规则判断集群的部署方式,根据不同的部署方式生成不同的TaskScheduler和SchedulerBackend
createTaskScheduler会case不同的URL表达式生成TaskScheduler和SchedulerBackend。
SchedulerBackend是一个trait,作用是向等待分配计算资源的Task分配Executor,并启动Task。
Task的提交
DAGScheduler完成对Stage划分后,按照Stage的划分顺序调用:org.apache.spark.scheduler.TaskScheduler#submitTasks 将Stage提交到TaskScheduler
submitTasks开始Task的资源调度,Task被分配到Executor,在Worker上完成任务执行,调用堆栈如下:
1)org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
2)org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
3)org.apache.spark.scheduler.SchedulerBackend#reviveOffers
4)org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
5)org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks
6)org.apache.spark.executor.Executor#launchTask
此处分为Driver端、Executor执行,1~5在Driver端,6在Executor端。
1)将TaskSet加入TaskSetManager :
2)schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)
schedulableBuilder是调度器,支持两种策略,FIFO(默认)和FAIR(公平调度)。通过spark.scheduler.mode设置。schedulableBuilder会确定TaskSetManager的调度顺序,根据就近原则确定运行Task的Executor。
5)def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
响应资源调度请求,为Task分配具体的资源,输入是Executor列表,输出是TaskDescription二维数据,存储Task ID、Executor ID和Task执行环境依赖信息等。