抽象概念:
1)Application
用户编写的Spark程序,完成一个计算任务的处理。它是由一个Driver程序和一组运行于Spark集群上的Executor组成。
1.1. driver:开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务;Standalone模式下Driver内部的StandaloneSchedulerBackend与Master进行资源请求协商(实际上是其内部的StandaloneAppClient真正与Master通信)。
1.2. Executor:真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,
2)Job
spark程序中,每次调用Action会逻辑上生成job,每个job包括多个stage。
3)Stage
分为两类:shufflemapStage和ResultStage,程序运行中调用需要shuffle计算的Operator(与Rdd的transformation和action区别)如:reduce(),以shuffle为边界的分为Shufflemap,ResultStage。
4)TaskSet
TaskSet是在程序执行中由Stage映射为TaskSet,1个阶段->1个任务集,交由TaskSchedule操作,粗粒度的调度是以TaskSet为单位的。
5)Task
Task实在物理节点运行基本单位,依赖于窄依赖的串行Rdd集;Task包含两类:ShuffleMapTask和ResultTask,分别对应于Stage中ShuffleMapStage和ResultStage中的一个执行基本单元,RDD在计算的时候,每个分区都会起一个task,所以rdd的分区数目决定了总的的task数目。
先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。
这件事情要从Spark的DAG切割说起。Spark RDD通过其transaction和action操作,串起来形成了一个DAG。action的调用,触发了DAG的提交和整个job的执行。触发之后,由DAGScheduler这个全局唯一的面向stage的DAG调度器来切分DAG,根据是否 shuffle来切成多个小DAG,即stage。凡是RDD之间是窄依赖的,都归到一个stage里,这里面的每个操作都对应成MapTask,并行度就是各自RDD的partition数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个stage,这里面的操作对应成ResultTask,结果 RDD的partition数就是并行度。MapTask和ResultTask分别可以简单理解为传统MR的Map和Reduce,切分他们的依据本质上就是shuffle。所以shuffle之前,大量的map是可以同partition内操作的。每个stage对应的是多个MapTask或多个 ResultTask,这一个stage内的task集合成一个TaskSet类,由TaskSetManager来管理这些task的运行状态,locality处理(比如需要delay scheduling)。这个TaskSetManager是Spark层面上的,如何管理自己的tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是 TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如 mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。
SchedulerBackend 的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有 yarn,mesos,standalone。启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。
standalone模式可指定参数:
--driver-cores NUM //控制driver的内核数
--total-executor-cores NUM //用于所有executor的总的内核数
--executor-cores //每个执行器的内核数,yarn模式是1,standalone是所有可能内核。
spark-shell --driver-cores NUM //控制driver的内核数
standalone模式可指定参数:
--driver-cores NUM //driver内核数,只用于cluster模式(Default: 1).
--num-executors NUM //启动的执行器个数(Default: 2).
--executor-cores //每个执行器的内核数,yarn模式是1,standalone是所有可能内核。