1. 前言
本文主要讲述一下我对spark任务调度过程的理解,必要的地方会配合源码。
2 . Spark任务调度
2.1 基本概念
- Job
用户提交的spark应用程序中任何一个action操作(foreach,count...)都对应一个job的提交(提交job动作对应到源码SparkContext #runJob方),因此一个用户spark应用程序往往对应一到多个job。比如下面的例子:def main(args:Array[String]){ val sparkConf = new SparkConf().setAppName("Log Query") val sc = new SparkContext(sparkConf) val lines = sc.textFile("README.md",3) val words = lines.flatMap(line => line.split(" ")) val wordOne = words.map(word => (word,1)) val wordCount = wordOne.reduceByKey(_ + _,3) // foreach是一个action,对应一个job wordCount.foreach(println) // collect是一个action,对应一个job val resultAsArry = wordCount.collect() }
- Stage
Job提交之后,首先会被划分为一到多个Stage,划分Stage的原因在于一个job中有些操作(Transformation)是可以连在一起在同一个线程里执行的,这些连在一起的操作就像一根管道一样,数据从顺着管道流下去就行(比如.map.filter就可以连在一起),可是有些操作(shuffle操作,reduce,group等)会导致管道出现分支,数据不得不分流到不同管道,Stage的划分就以这中会导致分流(shuffle)的操作为分割,划分成不同的Stage,显然划分会导致Stage的依赖,上游Stage必须运行完,才能让下游Stage运行。Stage和Job一样是一种静态的东西,一个Stage里包含没有Shuffle依赖(也就是没有分流)的一连串RDD。真正提交运行的是Task。 - Task
Task是依据Stage建立起来的,上面说Stage包含了一连串RDD,RDD是一种数据的抽象描述,对应物理数据是包含了n个分区数据的。每一个Task就处理一个分区数据,一个包含了n个分区的Stage就会创建出n个Task,只有这n个Task都执行成功了,这个Stage才算成功,然后才可以执行下游的Stage。 - TaskSet
TaskSet是task的集合,包含了同一个Stage中的部分或者全部task,每次提交的是TaskSet,然后根据TaskSet创建TaskSetManager,spark中TaskSetManager是任务调度器一个调度单元,当一个TaskSetManager被调度器调度到时,就会从TaskSetManager中拿若干个task去执行。task会失败重试,重试的那些task又会组成一个新的TaskSetManager去让调度器调度,因此,一个正在运行的Stage可能会有多个TaskSetManager正在等待调度。 - TaskSchedulerImpl
任务调度器,它按照一定的策略调度TaskSetManager,然后会从被调度到的TaskSetManager获取若干个task发送到Executor去执行。只要TaskSetManager中有task没有运行完,那么这个TaskSetManager还是会继续被调度。目前有两种调度策略:FIFO和Fair模式。 - CoarseGrainedSchedulerBackend
运行在driver端,可以当作是Rpc的一个端点,从任务调度器获取任务并发送到Executor上执行(LaunchTask),以及接收Executor汇报的Task运行状态信息(StatusUpdate) - CoarseGrainedExecutorBackend
运行在Executor上,从CoarseGrainedSchedulerBackend上接收运行任务的请求(LaunchTask),任务运行结束后通过它向CoarseGrainedSchedulerBackend汇报任务状态(StatusUpdate).
整个过程大致如下图: