Spark原理 | 任务执行流程

Spark任务从提交到执行完成有很多步骤,整体上可以划分为三个阶段:

  • 应用的提交;

  • 执行环境的准备;

  • 任务的调度和执行。

Spark任务执行流程

一、执行流程概述

Spark有多种不同的运行模式,在不同模式下这三个阶段的执行流程也不太相同。

以on yarn模式为例,Spark应用提交shell命令如下:

$SPARK_HOME/bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode client \
 $SPARK_HOME/examples/jars/spark-examples*.jar

Spark应用执行过程可以划分如下三个阶段:

第一步:应用的提交

  • Driver端:

  • 解析参数,验证参数合法性

  • 检查和准备依赖jar包

  • 确定运行的主类,也就是应用的入口

  • Executor端:未创建

第二步:执行环境的准备

  • Driver端:

  • 进入应用的main函数,开始执行

  • 首先创建SparkContext对象,在创建时会执行

  • 初始化各个服务模块和通信的RPC环境

  • 向cluster manager申请资源

  • Executor端:

  • 在Worker节点启动Executor

  • 初始化Executor,启动各个服务模块

  • 连接到Driver端,汇报Executor的状态

第三步:任务的调度和执行

  • Driver端:

  • 执行处理任务代码

  • Job分解为Stage,并将Stage划分为Task

  • 提交Task到Executor端

  • 接受Executor端的状态和结果信息

  • Executor端:

  • 启动TaskRunner线程,执行接收到的Task

  • 向Driver端汇报执行状态

  • 向Driver端返回执行结果

二、执行流程详解

以如下代码为例,讲解Spark应用执行的各个阶段。

# HelloWorld.scala

import scala.math.random
import org.apache.spark.sql.SparkSession

object HelloWorld {
  def main(args: Array[String]) {
        val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
        val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
        rdd.collect()
    }
}

1、第一阶段:应用的提交

这个阶段主要在Driver端完成,主要目标是:准备依赖jar包并确定Spark应用的执行主类。具体的任务包括:

  1. 解析任务提交的参数,并对参数进行解析和保存。

  2. 准备任务启动参数制定的依赖文件或者程序包。

  3. 根据Spark应用的执行模式和应用的编写语言,来确定执行的主类名称。

  4. 实例化执行主类,生成SparkApplication对象,并调用SparkApplication.start()函数来运行Spark应用(如果是Java/Scala代码则执行Spark应用中的main函数)。

注意:第1阶段完成时,Driver端并没有向资源管理平台申请任何资源,也没有启动任何Spark内部的服务。

2、第二阶段:执行环境的准备

通过第1阶段,已经找到了运行在Driver端的Spark应用的执行主类,并创建了SparkApplication对象:app。此时,在app.start()函数中会直接调用主类的main函数开始执行应用,从而进入第2阶段。

第二阶段主要目标是:创建SparkSession(包括SparkContext和SparkEnv),完成资源的申请和Executor的创建。第2阶段完成后Task的执行环境就准备好了。

也就是说,第2阶段不仅会在Driver端进行初始化,而且还要准备好Executor。这一阶段的任务主要是在Driver端执行创建SparkSession的代码来完成,也就是执行下面一行代码:

val spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

第二阶段的Driver端主要完成以下步骤:

  • 创建SparkContext和SparkEnv对象,在创建这两个对象时,向Cluster Manager申请资源,启动各个服务模块,并对服务模块进行初始化。
  • 这些服务模块包括:DAG调度服务,任务调度服务,shuffle服务,文件传输服务,数据块管理服务,内存管理服务等。

第2阶段的Executor端主要完成以下步骤:

  • Driver端向Cluster Manager申请资源,若是Yarn模式会在NodeManager上创建ApplicationMaster,并由ApplicationMaster向Cluster Manager来申请资源,并启动Container,在Container中启动Executor。
  • 在启动Executor时向Driver端注册BlockManager服务,并创建心跳服务RPC环境,通过该RPC环境向Driver汇报Executor的状态信息。

第二阶段执行完成后的Spark集群状态如下:

Spark集群状态

3、第三阶段:任务的调度和执行

通过第2阶段已经完成了Task执行环境的初始化,此时,在Driver端已经完成了SparkContext和SparkEnv的创建,资源已经申请到了,并且已经启动了Executor。

这一阶段会执行接下来的数据处理的代码:

val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()

第3阶段Driver端主要完成以下步骤:

  • 执行Spark的处理代码,当执行map操作时,生成新的RDD;

  • 当执行Action操作时,触发Job的提交,此时会执行以下步骤:

  • 根据RDD的血缘,把Job划分成相互依赖的Stage;

  • 把每个Stage拆分成一个或多个Task;

  • 把这些Task提交给已经创建好的Executor去执行;

  • 获取Executor的执行状态信息,直到Executor完成所有Task的执行;

  • 获取执行结果和最终的执行状态。

参考资料

  1. Spark Scheduler 内部原理剖析
  2. 如何理解Spark应用的执行过程
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容