3.2 Spark调度机制
Spark调度机制是保证Spark应用高效执行的关键。本节从Application、job、stage和task的维度,从上层到底层来一步一步揭示Spark的调度策略。
3.2.1 Application的调度
Spark中,每个Application对应一个SparkContext。SparkContext之间的调度关系取决于Spark的运行模式。对Standalone模式而言,Spark Master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和CPU资源的需求,如果可以,则Master创建Spark Driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于FIFO策略。在Mesos和YARN模式下,底层的资源调度系统的调度策略都是由Mesos和YARN决定的。具体分类描述如下:
- Standalone模式
默认以用户提交Application的顺序来调度,即FIFO策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数spark.cores.max来配置应用在集群中可以使用的最大CPU核数。如果不配置,则采用默认参数spark.deploy. defaultCore的值来确定。
- Mesos模式
如果在Mesos上运行Spark,用户想要静态配置资源的话,可以设置spark.mesos. coarse为true,这样Mesos变为粗粒度调度模式,然后可以设置spark.cores.max指定集群中可以使用的最大核数,与上面的Standalone模式类似。同时,在Mesos模式下,用户还可以设置参数spark.executor.memory来配置每个executor的内存使用量。如果想使Mesos在细粒度模式下运行,可以通过mesos://设置动态共享cpu core的执行模式。在这种模式下,应用不执行时的空闲CPU资源得以被其他用户使用,提升了CPU使用率。
- YARN模式
如果在YARN上运行Spark,用户可以在YARN的客户端上设置--num-executors来控制为应用分配的Executor数量,然后设置--executor-memory指定每个Executor的内存大小,设置--executor-cores指定Executor占用的CPU核数。
3.2.2 job的调度
前面章节提到过,Spark应用程序实际上是一系列对RDD的操作,这些操作直至遇见Action算子,才触发Job的提交。事实上,在底层实现中,Action算子最后调用了runJob函数提交Job给Spark。其他的操作只是生成对应的RDD关系链。如在RDD. scala程序文件中,count函数源码所示。
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
其中sc为SparkContext的对象。可见在Spark中,对Job的提交都是在Action算子中隐式完成的,并不需要用户显式地提交作业。在SparkContext中Job提交的实现中,最后会调用DAGScheduler中的Job提交接口。DAGScheduler最重要的任务之一就是计算Job与Task的依赖关系,制定调度逻辑。
Job调度的基本工作流程如图3-4所示,每个Job从提交到完成,都要经历一系列步骤,拆分成以Tsk为最小单位,按照一定逻辑依赖关系的执行序列。
[插图]
图3-4 Job的调度流程
图3-5则从Job调度流程中的细节模块出发,揭示了工作流程与对应模块之间的关系。从整体上描述了各个类在Job调度流程中的交互关系。
[插图]
图3-5 Job调度流程细节
在Spark1.5.0的调度目录下的SchedulingAlgorithm.scala文件中,描述了Spark对Job的调度模式。
- FIFO模式
默认情况下,Spark对Job以FIFO(先进先出)的模式进行调度。在SchedulingAlgorithm. scala文件中声明了FIFO算法实现。
- FAIR模式
Spark在FAIR的模式下,采用轮询的方式为多个Job分配资源,调度Job。所有的任务优先级大致相同,共享集群计算资源。具体实现代码在SchedulingAlgorithm.scala文件中,声明如下:
3.配置调度池
DAGScheduler构建了具有依赖关系的任务集。TaskScheduler负责提供任务给Task-SetManager作为调度的先决条件。TaskSetManager负责具体任务集内部的调度任务。调度池(pool)则用于调度每个SparkContext运行时并存的多个互相独立无依赖关系的任务集。调度池负责管理下一级的调度池和TaskSetManager对象。
用户可以通过配置文件定义调度池的属性。一般调度池支持如下3个参数:
1)调度模式Scheduling mode:用户可以设置FIFO或者FAIR调度方式。
2)weight:调度池的权重,在获取集群资源上权重高的可以获取多个资源。
3)miniShare:代表计算资源中的CPU核数。
用户可以通过conf/fairscheduler.xml配置调度池的属性,同时要在SparkConf对象中配置属性。
3.2.3 stage(调度阶段)和TasksetManager的调度
- Stage划分
当一个Job被提交后,DAGScheduler会从RDD依赖链的末端触发,遍历整个RDD依赖链,划分Stage(调度阶段)。划分依据主要基于ShuffleDependency依赖关系。换句话说,当某RDD在计算中需要将数据进行Shuffle操作时,这个包含Shuffle操作的RDD将会被用来作为输入信息,构成一个新的Stage。以这个基准作为划分Stage,可以保证存在依赖关系的数据按照正确数据得到处理和运算。在Spark1.5.0的源代码中,DAGScheduler.scala中的getParentStages函数的实现从一定角度揭示了Stage的划分逻辑。
- Stage调度
在第一步的Stage划分过程中,会产生一个或者多个互相关联的Stage。其中,真正执行Action算子的RDD所在的Stage被称为Final Stage。DAGScheduler会从这个final stage生成作业实例。
在Stage提交时,DAGScheduler首先会判断该Stage的父Stage的执行结果是否可用。如果所有父Stage的执行结果都可用,则提交该Stage。如果有任意一个父Stage的结果不可用,则尝试迭代提交该父Stage。所有结果不可用的Stage都将会被加入waiting队列,等待执行,如图3-6所示。
[插图]
图3-6 Stage依赖
在图3-6中,虚箭头表示依赖关系。Stage序号越小,表示Stage越靠近上游。
图3-6中的Stage调度运行顺序如图3-7所示。
[插图]
图3-7 Stage执行顺序
从图3-7可以看出,上游父Stage先得到执行,waiting queue中的stage随后得到执行。
- TasksetManager
每个Stage的提交会被转化为一组task的提交。DAGScheduler最终通过调用taskscheduler的接口来提交这组任务。在taskScheduler内部实现中创建了taskSetManager实例来管理任务集taskSet的生命周期。事实上可以说每个stage对应一个tasksetmanager。至此,DAGScheduler的工作基本完毕。taskScheduler在得到集群计算资源时,taskSet-Manager会分配task到具体worker节点上执行。在Spark1.5.0的taskSchedulerImpl.scala文件中,提交task的函数实现如下: 在Spark1.5.0的taskSchedulerImpl.scala文件中,提交task的函数实现如下:
当taskSetManager进入到调度池中时,会依据job id对taskSetManager排序,总体上先进入的taskSetManager先得到调度。对于同一job内的taskSetManager而言,job id较小的先得到调度。如果有的taskSetManager父Stage还未执行完,则该taskSet-Manager不会被放到调度池。
3.2.4 task的调度
在DAGScheduler.scala中,定义了函数submitMissingTasks,读者阅读完整实现,从中可以看到task的调度方式。