Spark架构师3-spark初始化和任务启动源码

1、上次总结 spark初始化环境资源 0:18:00~ 0:41:00

1、Spark RPC(Endpoint:DriverEndpoint ClientEndpoint)
2、利用 akka(endpoint类似于actor) 模拟实现 YARN(Flink 就是基于 akka实现的 RPC)
3、Spark Standalone 集群启动脚本start-all.sh 分析
4、Master 启动分析
5、Worker 启动分析
6、Spark APP 提交脚本 spark-submit 脚本分析
7、SparkSubmit 分析(重点是进入main方法,通过反射的方式运行用户编写的application的主类main方法)

8、SparkContext 初始化

1.1 SparkContext 初始化 分析 0:10~ 0:41

Spark任务执行流程分析.jpg

示例程序demo sparkPI
val spark: SparkSession = SparkSession.builder.appName("Spark Pi").getOrCreate()

注: 上图粉红色半边 ,7件事

0:18:16 代码开始

 ——》SparkSession#Builder#getOrCreate()
                val sparkContext: SparkContext = userSuppliedContext.getOrElse{
                     SparkContext.getOrCreate(sparkConf)
         ——》 SparkContext#getOrCreate()
                   setActiveContext(new SparkContext(config), allowMultipleContexts = false)                
                   ——》SparkContext类块#try
                            //以下为类级代码块 
                           try {
                                       _conf = config.clone()
                                       _conf.validateSettings()
 ############# 注释:第一步   创建Spark Env############         
                                * 
                               *  除了创建 sparkEnv之外,还创建了各种 manager 对象。                                
                                */ Create the Spark execution environment (cache, map output tracker, etc)
                                   _env = createSparkEnv(_conf, isLocal, listenerBus)
                          
                                ——》SparkEnv#createDriverEnv
                                    ——》SparkEnv#create
                                               //注释: 初始化 SecurityManager
                                          val securityManager = new SecurityManager(conf, ioEncryptionKey)                                              
                                                   //注释: 初始化 NettyRpcEnv
                                           val systemName = if (isDriver) driverSystemName  
                                           val rpcEnv = RpcEnv.create(systemName, 
                                                       bindAddress, advertiseAddress, 
                                                        port.getOrElse(-1), conf, securityManager,
                                                         numUsableCores, !isDriver)
                                         //注释: 初始化 SerializerManager   
        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)      
                                    //注释: 初始化 BroadcastManager      
        val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
                                //注释: 初始化 MapOutputTracker 
        val mapOutputTracker = if (isDriver) {
            new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
                               //注释: 初始化 SortShuffleManager    
        val shortShuffleMgrNames = Map("sort" ->  
                               // 注释: 初始化 UnifiedMemoryManager  统一内存管理模型
                                // StaticMemoryManaager  静态内存管理模型
        val memoryManager: MemoryManager = if (useLegacyMemoryManager) 
                                //注释: 初始化 BlockManagerMaster 
        val blockManagerMaster = new BlockManagerMaster(
                                //注释: 初始化 BlockManager 
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,  
                               //注释: 初始化 OutputCommitCoordinator 
        val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse 
                               //注释: 正式初始化SparkEnv
        val envInstance = new SparkEnv(executorId, rpcEnv, serializer, closureSerializer,
         return   envInstance

 ############# 注释:第二步   创建SparkUI############         
/**   注释:第二步
         *  创建并初始化Spark UI
         */
        _ui = if (conf.getBoolean("spark.ui.enabled", true)) {
            // TODO_MA 注释:_jobProgressListener跟踪要在UI中显示的任务级别信息,startTime就是SparkContext的初始时的系统时间
            // TODO_MA 注释:返回SparkUI,它的父类是WebUI,和MasterWebUI是一个级别的
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
        } else {
            // For tests, do not enable the UI
            None
        }    
 ############# 注释:第三步   hadoop相关配置以及Executor环境变量############    
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
            jars.foreach(addJar)
        }
        
        if (files != null) {
            files.foreach(addFile)
        }

 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser

 ############# 注释: 第四步:创建心跳接收器 ############
            *  1、我们需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,因为Executor将在构造函数中检索“HeartbeatReceiver”
         *  2、创建一个HeartbeatReceiver 的RpcEndpoint注册到RpcEnv中,每分钟给自己发送ExpireDeadHosts,去检测Executor是否存在心跳,
         *  3、如果当前时间减去最一次心跳时间,大于1分钟,就会用CoarseGrainedSchedulerBackend将Executor杀死
         */
        
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
 

 ############# 注释:第五步:创建任务调度TaskScheduler############           

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts

 ############# 注释:第六步:创建和启动DAGScheduler############      
     
 /*  *  1、内部初始化了一个:DAGSchedulerEventProcessLoop 用来处理各种任务
         *  2、在 DAGSchedulerEventProcessLoop 创建的时候,构造函数的内部的最后一句代码执行了 DAGSchedulerEventProcessLoop的启动。
         *  将来任务的提交,取消等,都会发送一个事件给 DAGSchedulerEventProcessLoop
         *  从而触发 dagScheduler.onRecevie() 的运行。
         */
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)



 ############# 注释:第七步:TaskScheduler的启动,backend.start()############  
 /** :第七步:TaskScheduler的启动,主要任务:backend.start()
         */
        
        _taskScheduler.start()
                                       ↓
        TaskSchedulerImpl#start()
                  .......
                               backend.start()
                                                    ↓
StandaloneSchedulerBackend.start()
                                                     ▼
/** :调用父类方法 start() 方法启动一个 DriverEndPoint
         *   super 粗粒度的 CoarseGrainedSchedulerBackend
         */
                           super.start()        
                                            ↓
                CoarseGrainedSchedulerBackend.start()
                                          ▼
                              /**  创建一个 DriverEndPoint 负责跟 master 打交道的         
                             driverEndpoint = createDriverEndpointRef(properties)

                          /**  注释:里面维护了一个 DriverEndPoint 主要用来向 Executor分发任务 
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        
        /*    注释:client start  启动了 ClientEndpoint 
                        client.start()

            ——》 StandaloneAppClient.start()
                     endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
                            ——》 ClientEndpoint#Onstart()方法
                                    /** 注释:clientEndPoint 向 Master 执行注册 
                                       registerWithMaster(1)
                                                      ▼
                                       registerMasterFutures.set(tryRegisterAllMasters())
                                    ——》tryRegisterAllMasters()
                                                      ▼
                             /** 注释:创建了一个 Master 的 RPC 代理
                         */
                        val masterRef = rpcEnv.setupEndpointRef(masterAddress,  
                        
                        /**
                         *   注释:注册
                         */
                        masterRef.send(RegisterApplication(appDescription, self))
                  ——》RegisterApplication()
                                            ↓
                                Master#类 receive 方法 
                                         ...................
                                     case RegisterApplication(description, driver) => 
                                          ...................
                                     case RegisterWorker
                                                schedule()
                                                    launchDriver(worker, driver)    启动driver
                                              ——》startExecutorsOnWorkers() 
                                                                      ▼
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                    allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
                }
                                                  ——》allocateWorkerResourceToExecutors
                                                       ——》launchExecutor(worker, exec)
                                                                                  ▼
                                                           /* 注释:发送命令让 worker 启动 executor
         *   worker 节点的一个 RPC 节点,负责通信的。
         */
        worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
        
        / 注释:发消息告诉 Driver 该 worker 上的 executor 已经启动
         */
        exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))          
                                                                           ↓
                                                       Worker类  receive 方法
                                                                      ▼            
/* 注释: 接收到 Master 发送过来的启动 Executor 的命令
     */
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 
                
/* 注释:把启动Executor必要的一些信息,封装在 ExeuctorRunner 中
             */
            val manager = new ExecutorRunner(appId, execId, appDesc.copy(command =
                 /* : 启动好了 Executor 之后,返回给 Master一个信号。
             *   信息封装在 ExecutorStateChanged 对象中。
             */
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

           /* 启动 executor
             *   worker.send(ExecutorStateChanged)
             */
            manager.start()
                          ↓
   ——》ExecutorRunner#start
              ——》fetchAndRunExecutor()
                     /* :构建jvm  进程启动命令  */
            process = builder.start()
            val header = "Spark Executor Command: %s\n%s\n\n".format(
                           ↓   跳转到Executor类构造方法
            Executor # 
                        // 实际上是构建了一个用于执行task的线程池
    private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d")
            .setThreadFactory(new ThreadFactory {

2、 本次内容概述 spark 任务提交 分析 0:41:00 ~

1、Spark Application 提交流程分析
2、Spark Application 的 DAG 生成和 Stage 切分分析
3、Spark 的 Task 分发和执行源码分析
4、Spark 的 Shuffle 机制源码分析

2.1 Spark Application 提交流程分析 0:43 ~0:58

入口:spark application 中的 action 算子!(SparkPi 程序中的 reduce 函数)
以 SparkPi 程序举例:reduce() 算子就是提交 job 的入口

reduce()

sc.runJob
——》SparkContext#runJob
——》DAGScheduler#runJob

/*
* 1、应用程序调用action算子
* 2、sparkcontext。runjob
* 3、dagscheduler。runjob
* 4、taskscheduler。submittasks
* 5、schedulerbackend。driverEndpoint 提交任务
/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
——》DAGScheduler#runJob

/
* 注释: 提交任务
* 参数解析:
* 1、rdd:要在其上运行任务的参数RDD目标RDD
* 2、func:在RDD的每个分区上运行的函数
* 3、partitions:要运行的分区的集;某些作业可能不希望在目标RDD的所有分区上进行计算,例如,对于 first() 之类的操作。
* 4、callSite:在用户程序中调用此作业的位置
* 5、resultHandler:回调函数,以将每个分区结果传递给Xxx
* 6、properties:要附加到此作业的scheduler属性,例如fair scheduler pool name
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

——》DAGScheduler#runJob

/**
* 第一步:封装一个JobWaiter对象;
* 第二步:将JobWaiter对象赋值给JobSubmitted的listener属性,
* 并将JobSubmitted(DAGSchedulerEvent事件)对象传递给eventProcessLoop事件循环处理器。eventProcessLoop
* 内部事件消息处理线程将会接收JobSubmitted事件,并调用dagScheduler.handleJobSubmitted(...)方法来处理事件;
* 第三步:返回JobWaiter对象。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

    /**         
     *   注释:这是提交任务运行
     *   eventProcessLoop 就是当初 DAGScheduler 在初始化的时候,创建的一个 DAGSchedulerEventProcessLoop
     *   这个组件主要负责:任务的提交执行
     */
    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))

——》 eventProcessLoop 构造函数 DAGSchedulerEventProcessLoop
——》EventLoop#post

   ——》DAGSchedulerEventProcessLoop#onRecive   
        ——》DAGSchedulerEventProcessLoop#doOnRecive
          ——》DAGSchedulerEventProcessLoop#handleJobSubmitted 

从此,任务的提交就交给了 dagScheduler#handleJobSubmitted 方法

/* 注释: RDD DAG划分Stages:Stage的划分是从最后一个Stage开始逆推的,

  • 每遇到一个宽依赖处,就分裂成另外一个Stage

    • 依此类推直到Stage划分完毕为止。并且,只有最后一个Stage的类型是ResultStage类型。
    • 注意Dataset、DataFrame、sparkSession.sql("select ...")
    • 经过catalyst代码解析会将代码转化为RDD
  • 做了2件最主要的事

  • 1、stage切分

  • 2、 stage 提交
    /
    /
    注释: Stage 切分
    * 这个 finalRDD 就是 rdd链条中的最后一个 RDD,也就是触发 sc.runJob() 方法执行的 RDD
    */

         finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
     /* 
      *   注释: 提交 Stage
      */
     submitStage(finalStage)
    

2.2 Spark Application 的 DAG 生成和 Stage 切分分析 0:58~ 1:46

入口:EventLoop 中的 eventQueue.take() 方法

如果任务提交,则有 JobSubmitted 事件提交到 eventQueue 中,则 eventQueue.take() 阻塞返回,此时的 event 就是 JobSubmitted。
根据事件机制,跳转到:DAGScheduler.handleJobSubmitted() 方法
根据 driver 发送过来的 事件类型,来决定到底做什么!

两个核心的方法:

// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage执行入口
submitStage(finalStage)

方法依赖关系:
1、createResultStage(传入finalRDD获得ResultStage) ->2
2、getOrCreateParentStages(传入rdd获得父stage) ->3->4
3、getShuffleDependencies(传入rdd获得宽依赖)
4、getOrCreateShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->5->6
5、getMissingAncestorShuffleDependencies(传入一个rdd获得所有宽依赖) ->3
6、createShuffleMapStage(传入宽依赖获得ShuffleMapStage) ->2

image.png

image.png

RDD任务切分中间分为:Application、Job、Stage 和 Task
1、Application:初始化一个 SparkContext 即生成一个 Application;
2、Job:一个 Action 算子就会生成一个 Job;
3、Stage:Stage 等于宽依赖的个数加 1;
4、Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系

dagScheduler#handleJobSubmitted 主方法

一 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)


// TODO_MA 注释:获取当前Stage的parent Stage,这个方法是划分Stage的核心实现
val parents = getOrCreateParentStages(rdd, jobId)

image.png

image.png


1 getShuffleDependencies(rdd).map { shuffleDep => 2 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList

                  1 ——》DAGScheduler#getShuffleDependencies   
                                               ▼
              /**  重点方法  找RDD依赖链 
              * Returns shuffle dependencies that are immediate parents of the given RDD.
              * This function will not return more distant ancestors.
              * For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
               * A <-- B <-- C
                 * calling this function with rdd C will only return the B <-- C dependency.
               * This function is scheduler-visible for the purpose of unit testing.
                * TODO_  采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
                  */
        
                         2 ——》   getOrCreateShuffleMapStage
               /* TODO_MA 如果shuffleIdToMapStage中存在shuffle,则获取shuffle map stage。
                  *  否则,如果shuffle map stage不存在,该方法将创建shuffle map stage
                  *   以及任何丢失的parent shuffle map stage。
   ***************************************

二 、 /* 注释: 递归提交 Stage /
submitStage(finalStage)
——>submitMissingTasks()

/
注释: 把 stage 变成 Tasks
* Step3: 为每个需要计算的partiton生成一个task
*/
val tasks: Seq[Task[_]] = try {
//如果是 ShuffleMapStage 阶段的 Task,则构建 ShuffleMapTask
case stage: ShuffleMapStage => stage.pendingPartitions.clear()
.............
//如果是 ResultStage 阶段的 Task,则构建 ResultTask
case stage: ResultStage => partitionsToCompute.map

   /* 注释: 如果该阶段有 Task 需要执行
     *   Step4: 提交tasks
     */
    if (tasks.size > 0) {          
           //注释: taskScheduler 的具体类型是:TaskSchedulerImpl
          taskScheduler.submitTasks(new TaskSet(tasks.toArray, 

2.3 Spark Task 分发和执行分析 2:13 ~ 2:45

入口接上面的: taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id,
taskScheduler 的具体类型是:TaskSchedulerImpl

TaskSchedulerImpl#submitTasks 方法

/** 注释:TaskScheduler提交job,最后交由 SchedulerBackEnd 进行提交
*/
backend.reviveOffers()

CoarseGrainedSchedulerBackend.reviveOffers()

// 给 DriverEndpoint (自己 )发送 ReviveOffers 消息
driverEndpoint.send(ReviveOffers)

CoarseGrainedSchedulerBackend#DriverEndpoint#receive

case ReviveOffers => makeOffers()

scheduler.resourceOffers(workOffers) //申请计算资源
.........
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)

// 注释: 发送 LaunchTask 消息给:CoarseGrainedExecutorBackend 类
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

CoarseGrainedExecutorBackend# receive()
▼ //data 是任务的反序列化
case LaunchTask(data) => if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else
{ ....... executor.launchTask(this, taskDesc)
}

Executor#launchTask

// TODO_MA 注释: 封装一个 TaskRunner 线程对象,来运行一个 Task
val tr = new TaskRunner(context, taskDescription)

  • 注释: 提交到线程池运行,那么就转到 TaskRunner 的 run() 方法
    */
    threadPool.execute(tr)
    ↓ 具体运行一个 Task 的地方
    Executor#TaskRunner#run()

    val value = Utils.tryWithSafeFinally {
    val res = task.run(taskAttemptId = taskId,

    Task#run()//

    runTask(context)// 两种task , 一种是shuffer ,一种是result
    ↓shuffer
    1. ShuffleMapTask#runTask()

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get


2.3 Spark Suffle 源码分析 2:45~ 3:05
HashShuffleManager的运行原理 2:45~ 2:52
SortShuffleManager运行原理 2:52 ~3:13

                                       ↓   有4个writer ; 以SortShuffleWriter为例
               SortShuffleWriter#write
                                      ▼ 
                *   注释: 先排序 
    sorter = if (dep.mapSideCombine) {
                插入数据到排序区
    sorter.insertAll(records)          

val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

     val tmp = Utils.tempFileWith(output)

val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)



                                              ↓     Result
                       2.     ResultMapTask#runTask()
                                               ▼ 
image.png
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352