Spark任务从提交到执行完成有很多步骤,整体上可以划分为三个阶段:
- 应用的提交; 
- 执行环境的准备; 
- 任务的调度和执行。 

一、执行流程概述
Spark有多种不同的运行模式,在不同模式下这三个阶段的执行流程也不太相同。
以on yarn模式为例,Spark应用提交shell命令如下:
$SPARK_HOME/bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode client \
 $SPARK_HOME/examples/jars/spark-examples*.jar
Spark应用执行过程可以划分如下三个阶段:
第一步:应用的提交
- Driver端: 
- 解析参数,验证参数合法性 
- 检查和准备依赖jar包 
- 确定运行的主类,也就是应用的入口 
- Executor端:未创建 
第二步:执行环境的准备
- Driver端: 
- 进入应用的main函数,开始执行 
- 首先创建SparkContext对象,在创建时会执行 
- 初始化各个服务模块和通信的RPC环境 
- 向cluster manager申请资源 
- Executor端: 
- 在Worker节点启动Executor 
- 初始化Executor,启动各个服务模块 
- 连接到Driver端,汇报Executor的状态 
第三步:任务的调度和执行
- Driver端: 
- 执行处理任务代码 
- Job分解为Stage,并将Stage划分为Task 
- 提交Task到Executor端 
- 接受Executor端的状态和结果信息 
- Executor端: 
- 启动TaskRunner线程,执行接收到的Task 
- 向Driver端汇报执行状态 
- 向Driver端返回执行结果 
二、执行流程详解
以如下代码为例,讲解Spark应用执行的各个阶段。
# HelloWorld.scala
import scala.math.random
import org.apache.spark.sql.SparkSession
object HelloWorld {
  def main(args: Array[String]) {
        val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
        val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
        rdd.collect()
    }
}
1、第一阶段:应用的提交
这个阶段主要在Driver端完成,主要目标是:准备依赖jar包并确定Spark应用的执行主类。具体的任务包括:
- 解析任务提交的参数,并对参数进行解析和保存。 
- 准备任务启动参数制定的依赖文件或者程序包。 
- 根据Spark应用的执行模式和应用的编写语言,来确定执行的主类名称。 
- 实例化执行主类,生成SparkApplication对象,并调用SparkApplication.start()函数来运行Spark应用(如果是Java/Scala代码则执行Spark应用中的main函数)。 
注意:第1阶段完成时,Driver端并没有向资源管理平台申请任何资源,也没有启动任何Spark内部的服务。
2、第二阶段:执行环境的准备
通过第1阶段,已经找到了运行在Driver端的Spark应用的执行主类,并创建了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第2阶段。
第二阶段主要目标是:创建SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的创建。第2阶段完成后Task的执行环境就准备好了。
也就是说,第2阶段不仅会在Driver端进行初始化,而且还要准备好Executor。这一阶段的任务主要是在Driver端执行创建SparkSession的代码来完成,也就是执行下面一行代码:
val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
第二阶段的Driver端主要完成以下步骤:
- 创建SparkContext和SparkEnv对象,在创建这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
- 这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。
第2阶段的Executor端主要完成以下步骤:
- Driver端向Cluster Manager申请资源,若是Yarn模式会在NodeManager上创建ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
- 在启动Executor时向Driver端注册BlockManager服务,并创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息。
第二阶段执行完成后的Spark集群状态如下:

3、第三阶段:任务的调度和执行
通过第2阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的创建,资源已经申请到了,并且已经启动了Executor。
这一阶段会执行接下来的数据处理的代码:
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
第3阶段Driver端主要完成以下步骤:
- 执行Spark的处理代码,当执行map操作时,生成新的RDD; 
- 当执行Action操作时,触发Job的提交,此时会执行以下步骤: 
- 根据RDD的血缘,把Job划分成相互依赖的Stage; 
- 把每个Stage拆分成一个或多个Task; 
- 把这些Task提交给已经创建好的Executor去执行; 
- 获取Executor的执行状态信息,直到Executor完成所有Task的执行; 
- 获取执行结果和最终的执行状态。