Spark1.3.1源码分析 Spark job 提交流程

spark提交

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

例如WordCount代码

import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by lancerlin on 2018/2/2. 
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop");

    //非常重要,是通向Spark集群的入口
    val conf = new SparkConf().setAppName("WC")
    val sc = new SparkContext(conf)

    sc.textFile(args(0)) 
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(args(1))
    sc.stop()
  }
}

提交脚本

./bin/spark-submit \
  --class WordCount  \
  --master master01:7070 \
  --deploy-mode client \
  --executor-memory 8G \ 
  -- total-executor-cores 3  \
  wordcount.jar \
  hdfs://wordcount.txt

所以,想要分析程序提交的流程,必须从spark-submit脚本开始分析,--deploy-mode有两种模式,clientcluster,client是client 跟driver都在客户端启动,同一个jvm中,一种是client跟driver分开启动的方式,为什么叫client和driver,在源码中我们会找到答案。

spark-submit

image.png

查看分析org.apache.spark.deploy.SparkSubmit
image.png

image.png

image.png

image.png

  • 查看源码,我们可以知道,SparkSubmit类中的main方法中,调用Submit方法,submit方法调用doRunMain,doRunMain方法通过反射,实例化主程序,这里就是WordCount,在调用主程序的main方法,所以Submit的工作就完成了,接下来分析我们的主程序代码。
  • 总体流程如下图所示


    image.png

主程序

WordCount 代码

import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by lancerlin on 2018/2/2. 
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop");

    //非常重要,是通向Spark集群的入口
    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    sc.textFile(args(0)) 
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(args(1))
    sc.stop()
  }
}

SparkContext

sparkContext是spark程序的入口,通过sc来连接集群

sparkEnv

272行代码,创建了sparkDriverEnv,sparkEnv里面,翻译文档就是
保存正在运行的Spark实例(master或worker)的所有运行时环境对象, 包括序列化器,Akka ActorSystem,块管理器,地图输出追踪器等
这里最重要的是,前面我们分析master worker的时候,看到的ActorSystem

image.png

image.png

image.png

SparkContext.DAGScheduler

image.png

主要作用
主要就是计算生成stages,并跟踪RDD跟stage的输入输出,将stage的tasks封装成一个taskSet并提交,每个task的失败重试,推测,都由DAGScheduler处理


image.png

DAGSchedulerEventProcessLoop

DAGScheduler里有个很重要的成员变量DAGSchedulerEventProcessLoop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
可以学习下这个类的编程模式,设计模式中,这个叫做模板方法模式
父类EventLoop定义好了编程模型,子类DAGSchedulerEventProcessLoop重写recieve方法

image.png

schedulerBackend, taskScheduler

创建了两个非常重要的成员变量,schedulerBackend, taskScheduler,然后我们查看SparkContext.createTaskScheduler(this, master),这个方法的实现

image.png

根据传入的master信息,来决定调度器的实现,我们查看spark的模式实现

  • 首先创建TaskSchedulerImpl(sc),这是task任务的实现类
  • 然后创建SparkDeploySchedulerBackend,接着调用scheduler.initialize(backend)

image.png

image.png

scheduler.initialize(backend)方法中,创建了一个调度器,默认是FIFO调度器
image.png

image.png

381行代码中,将刚才创建的taskScheduler启动了起来,backend是实现类是SparkDeploySchedulerBackend,查看SparkDeploySchedulerBackendstart方法
image.png

image.png

SparkDeploySchedulerBackend继承了CoarseGrainedSchedulerBackend,spark-on-yarn的时候,backend的实现类就是这个CoarseGrainedSchedulerBackend

image.png

CoarseGrainedSchedulerBackendstart方法中,创建了一个driverActor,通过查看DriverActor类,结合查看master worker的源码,我们知道,DriverActor会走生命周期方法,然而,CoarseGrainedSchedulerBackend只是创建了DriverActor,并没有启动,分析DriverActor,我们可以知道,他是发送task到executor上执行的,所以等待分配好资源后,才启动
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive
image.png

image.png

image.png

image.png

image.png

看回SparkDeploySchedulerBackend的start方法

image.png

重要的代码,查看AppClient

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

start方法中通过actorSystem创建了一个ClientActor,执行preStart(),receiveWithLogging()生命周期方法

image.png

image.png

image.png

image.png

master接收到RegisterApplication(appDescription),保存app信息,告诉client注册完毕,RegisteredApplication(app.id, masterUrl),然后调用schedule(),这个方法我们在分析master、worker的时候已经看到过,后面我们会重点分析,主要就是mater指挥worker启动executor

image.png

Master:schedule

image.png

这里是启动executor的两种方式,一种是尽量打散,默认的方式,一种是尽量集中,通过
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)配置
image.png

尽量打散

分析源码我们可以看到尽量打散的源码分析, 比如此时需要--executor-memory 4G --total-executor-cores 8

  • 首先判断内存大于app资源,这里是4G的worker并且该worker没有该app的executor,源码里说的是,standalone的情况下,不允许app的两个executor在同一个worker上


    image.png
  • 符合条件的worker按照剩余内存降序,判断cpu cores,如果app需要的cores > 符合条件workers的cores的总数,则取小的,比如app需要10cores,而符合条件的worker一共只有9cores,那么就是用9cores

  • 分配好每个worker的cores和memory后,launchExecutor(usableWorkers(pos), exec)
    比如现在有3个Worker来执行WordCount,程序,按照尽量打散的逻辑,分配前后的executor如下图所示

    image.png

image.png
集中
  • 找到内存资源符合的worker,计算该worker的cores是否大于app需要的cores,取两者的最小值
  • 启动ExecutorlaunchExecutor(worker, exec)
    image.png
launchExecutor

接下来重点分析如何启动Executor,分析到这里,我们知道,SparkDeploySchedulerBackend start()方法中,将将app信息封装成一个appDesc,然后通过ClientActor,将appDesc封装成case class发送给Master,master接收到后,通过集中或者打散的规则给Worker分配需要启动的Executor资源,调用launchExecutor,重点分析launchExecutor

image.png

image.png

master发送信息给Worker,LaunchExecutor,然后又给driver发送ExecutorAdded,需要分别查看Worker跟DirverActor

image.png

Worker接收到LaunchExecutor,首先创建executor工作目录,然后启动一个ExecutorRunner.start,start方法中创建了一个线程workerThread去启动executor,因为启动executor进程可能会消耗很多时间,需要异步处理,所以开启线程去启动。fetchAndRunExecutor是启动executor的方法。通过fetchAndRunExecutor启动的类是clientActor指定的,org.apache.spark.executor.CoarseGrainedExecutorBackend,所以此时,需要到org.apache.spark.executor.CoarseGrainedExecutorBackend类中查看main方法,启动Executor后,worker会相应的减少cores和memory。

image.png

image.png

image.png

org.apache.spark.executor.CoarseGrainedExecutorBackend

Executor也是一个Actor,会走跟master、worker一样的Actor生命周期方法。executor启动包含dirverUrl,appId,cores,worker等信息
private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends Actor with ActorLogReceive with ExecutorBackend with Logging

image.png

run方法中,启动了两个actorCoarseGrainedExecutorBackend, WorkerWatcher
image.png

org.apache.spark.executor.CoarseGrainedExecutorBackend.preStart & receiveWithLogging

向dirver发送信息注册executor,dirver回复注册executor成功,executor接收driver注册成功后,创建一个Executor对象

image.png

image.png

image.png

image.png

Executor

构造了一个线城池threadPoollaunchTask方法中,将接收到的task封装成一个TaskRunner,然后将他提交给threadPool执行

image.png

image.png

创建executorActor与driver交互


image.png

startDriverHeartbeater,查看代码,这是executor与driver保持心跳的,默认是val interval = conf.getInt("spark.executor.heartbeatInterval", 10000),10秒钟发送一次,但是在driverActor中,没有找到跟他交互的代码

image.png

image.png

CoarseGrainedExecutorBackend中还启动了个WorkerWatcher,负责跟worker保持心跳

image.png

image.png

总结

说些了那么多,最终我们可以得到如下图所示的关系,一切都准备好了,此时,就等待driver task的发送,
executor接收task,开始执行task


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容