Spark 任务调度之 Register App

接着Spark 任务调度之Launch Driver,继续介绍Driver启动过程中,当SparkContext初始化时,Driver端注册DriverEndpoint到RpcEnv及Driver向Master注册APP信息的流程。

Register App

当 Driver 启动之后,就会运行 用户编写的代码的 Main方法,Main中首先 创建了 SparkContext,在 SparkContext 中 首先创建了 TaskScheduler,并调用 start 方法,如图:


在 Standalone 模式下,_taskScheduler 代表 StandaloneSchedulerBackend,_taskScheduler.start() 调用 StandaloneSchedulerBackend.scala 下的 start() 方法,先看下 StandaloneSchedulerBackend 类的定义:

private[spark] class StandaloneSchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
  ...
}

可以看到 其继承自 CoarseGrainedSchedulerBackend。

StandaloneSchedulerBackend 的 start 方法:

//  步骤1
super.start()
...
//  步骤2   初始化 StandaloneAppClient,并调用 start 方法
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()

首先 步骤1: 调用 父类也就是 CoarseGrainedSchedulerBackend(在下面子项中 介绍) 的 start 方法,主要是注册DriverEndpoint(在 CoarseGrainedSchedulerBackend.scala 下的一个类)到RpcEnv,DriverEndpoint用于提交task到Executor,接收Executor返回的计算结果

其次 步骤2:注册ClientEndpoint,ClientEndpoint的生命周期方法onStart中会和 Master
通信,注册APP。
看下StandaloneAppClient 的 start() 方法:

  def start() {
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

这样触发 ClientEndpoint 的 onStart 方法,onStart 中最终调用 tryRegisterAllMasters,主要是向 Master 发送注册App 的消息:

val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))

看下 Master 接收 RegisterApplication 消息的处理:

//先创建Application,再注册
val app = createApplication(description, driver)
registerApplication(app)

看下 registerApplication:

    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
    waitingApps += app

完成流程如下(步骤3 SparkDeploySchedulerBackend 应该替换成 StandaloneSchedulerBackend,步骤6 AppClient 替换成 StandaloneAppClient):

主要步骤如下

  1. 如上图①,Driver端注册DriverEndpoint到RpcEnv的流程,之后DriverEndpoint用于和Executor通信,包括send task和接收返回的计算结果
  2. 如上图②,Driver向Master注册APP的流程。

下面介绍下 CoarseGrainedSchedulerBackend:

CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend 是粗粒度的SchedulerBackend实现,使用集合 executorDataMap 维护和 Executor 通信的RpcEndpointRef

executorDataMap

存储了 ExecutorData 的数据集
private val executorDataMap = new HashMap[String, ExecutorData]
看下ExecutorData :

private[cluster] class ExecutorData(
   val executorEndpoint: RpcEndpointRef,
   val executorAddress: RpcAddress,
   override val executorHost: String,
   var freeCores: Int,
   override val totalCores: Int,
   override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

重点关注下面几个参数:

  • val executorEndpoint: RpcEndpointRef:负责发送消息给 ExecutorBackEnd
    实际应用:
    在 DriverEndpoint 的 receive方法中 KillTask 消息中 ,发送 killTask 消息:
executorDataMap.get(executorId) match {
    executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread, reason))
}
  • var freeCores: Int,:剩余核数
    当Task 运行完,freeCores会增加:
if (TaskState.isFinished(state)) {
  executorDataMap.get(executorId) match {
    case Some(executorInfo) =>
      executorInfo.freeCores += scheduler.CPUS_PER_TASK
  }
}

当 Task 开始运行,freeCores会减少:

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val executorData = executorDataMap(task.executorId)
    executorData.freeCores -= scheduler.CPUS_PER_TASK
  }
}
DriverEndpoint

DriverEndpoint 是 CoarseGrainedSchedulerBackend 的内部类
主要有两个职责:
职责一:
负责接收 var driverEndpoint: RpcEndpointRef = null 发送的消息。

主要在 CoarseGrainedSchedulerBackend 的 public 和
override 方法中调用 driverEndpoint send 方法:

  def stopExecutors() {
    driverEndpoint.askSync[Boolean](StopExecutors)
  }
  
  override def stop() {
    driverEndpoint.askSync[Boolean](StopDriver)
  }

driverEndpoint 在 CoarseGrainedSchedulerBackend 的 start 中创建,接着创建了 DriverEndpoint:

override def start() {
    driverEndpoint = createDriverEndpointRef(properties)
}

protected def createDriverEndpointRef(
        properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
}

DriverEndpoint:

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    extends ThreadSafeRpcEndpoint with Logging {
        // 接收 RpcEndpointRef 发送的消息
    override def receive: PartialFunction[Any, Unit] = {
        case StatusUpdate =>
            ...
        case ReviveOffers =>
            ...
        case KillTask =>
            ...
    }
}

职责二:
通过 ExecutorData 的 executorEndpoint 发送消息给 ExecutorBackEnd。主要包括:RegisteredExecutor、LaunchTask、KillTask、StopExecutor,一般和 DriverEndpoint 收到的事件对应。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容