Spark源码:启动TaskScheduler

源码目录


初始化 SparkContext 时,会创建TaskScheduler,现在来看看TaskScheduler 启动过程。

1 启动TaskScheduler

调用_taskScheduler.start()启动TaskScheduler。

  • 进入org.apache.spark.scheduler.TaskSchedulerImpl.scala
  private val speculationScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
  1. backend.start()启动SchedulerBackend;
  2. 如果是 非Local模式 且 spark.speculation = true,即开启了推测机制,则定时启新线程执行checkSpeculatableTasks,检查可推测的Tasks。

2 启动SchedulerBackend

  • 进入org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.scala
private[spark] class StandaloneSchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with StandaloneAppClientListener
  with Logging {

  override def start() {
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    // The endpoint for executors to talk to us
    val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val webUrl = sc.ui.map(_.webUrl).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    // If we're using dynamic allocation, set our initial executor limit to 0 for now.
    // ExecutorAllocationManager will send the real initial limit to the Master later.
    val initialExecutorLimit =
      if (Utils.isDynamicAllocationEnabled(conf)) {
        Some(0)
      } else {
        None
      }
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

}

StandaloneSchedulerBackend 继承了 CoarseGrainedSchedulerBackend。

  1. super.start() 调用父类 CoarseGrainedSchedulerBackend 的 start 方法;
  2. 配置各种参数:driverUrl、args、extraJavaOpts、classPathEntries、libraryPathEntries、javaOpts等,构建Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......)
  3. 生成 ApplicationDescription,将 Command 加入到 ApplicationDescription 中,后面会使用到;
  4. 创建 StandaloneAppClient 并启动;
  5. 更新 app 状态为 SUBMITTED;
  6. 等待 app 注册并启动;
  7. 更新 app 状态为 RUNNING。

注:这边的 2、3两步和提交 Application 时启动 Driver 的过程很相似:

  • 在启动 Driver 时,配置各种参数构建Command("org.apache.spark.deploy.worker.DriverWrapper",......),然后创建 DriverDescription;

  • 此处,配置各种参数构建Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......),然后创建 ApplicationDescription。

具体见Spark源码:提交Application到Spark集群

2.1 启动CoarseGrainedSchedulerBackend

  • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala
  override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }


  protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }


  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
  }

创建 DriverEndpoint,以 “CoarseGrainedScheduler” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

注:每次注册 RpcEndpoint 到 RpcEnv 上时,都会加入OnStart 到 Inbox 的队列中,因此必然要执行 RpcEndpoint.onStart() 方法。

来看看 DriverEndpoint.onStart 方法。

  • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint
  class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
    extends ThreadSafeRpcEndpoint with Logging {

    override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

  }

该方法中会启一个新线程定时给自己发送 ReviveOffers 消息。

  • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint.scala
    override def receive: PartialFunction[Any, Unit] = {

      case ReviveOffers =>
        makeOffers()
    }


    // Make fake resource offers on all executors
    private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = withLock {
        // Filter out executors under killing
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        val workOffers = activeExecutors.map {
          case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
              Some(executorData.executorAddress.hostPort))
        }.toIndexedSeq
        scheduler.resourceOffers(workOffers)
      }
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }

此方法会遍历 CoarseGrainedSchedulerBackend.executorDataMap,而此时 executorDataMap 中还没有任何东西,因此该方法等于啥也没干,等后面分析。

2.2 创建 StandaloneAppClient 并启动

  • 进入org.apache.spark.deploy.client.StandaloneAppClient.scala
private[spark] class StandaloneAppClient(
    rpcEnv: RpcEnv,
    masterUrls: Array[String],
    appDescription: ApplicationDescription,
    listener: StandaloneAppClientListener,
    conf: SparkConf)
  extends Logging {

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

  // 省略部分内容
}

创建ClientEndpoint,并以 “AppClient” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

  • 进入org.apache.spark.deploy.client.StandaloneAppClient.ClientEndpoint.scala
    override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }


    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }


    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }
  1. 遍历所有masterRpcAddresses;
  2. 根据 masterAddress 和 masterEndpointName 获取masterRpcEndpointRef;
  3. 利用 masterRpcEndpointRef 发送 RegisterApplication(ApplicationDescription, DriverRpcEndpointRef) 消息。

创建 StandaloneAppClient 并启动其实就是为了给 Master 发消息,准备注册 Application。

2.3 注册Application

  • 进入org.apache.spark.deploy.master.Master.scala
  override def receive: PartialFunction[Any, Unit] = {

    case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }

  }
  1. 如果是 STANDBY Master,不回响应;
  2. 调用 createApplication(appDescription, driverRpcEndpointRef) 方法创建 ApplicationInfo;
  3. 调用 registerApplication 注册 app,即将上面创建的 ApplicationInfo 加入到 Master.waitingApps 中;
  4. 利用 driverRpcEndpointRef 发送 RegisteredApplication 消息,即发送消息 RegisteredApplication 给 Driver,告诉 Driver application已经注册完成;
  5. 调用 schedule() 方法。

2.4 启动Application

  • 进入org.apache.spark.deploy.master.Master.scala
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

Spark源码:提交Application到Spark集群 中,注册完 Driver 后也是调用该方法启动 Driver 的。

那里由于还没有往 Master.waitingApps 中加入app,因此调用 startExecutorsOnWorkers 方法啥也不干,但是这里,已经有 app 加入到 Master.waitingApps 中了,因此调用 startExecutorsOnWorkers 方法会为 app 启动 Executors 了。

说明几点:

  1. 这里注册 Application 时创建的 ApplicationInfo 加入到 Master.waitingApps 中,在 Spark源码:提交Application到Spark集群 中,注册 Driver 时创建的 DriverInfo 加入到了 Master.waitingDrivers 中;

  2. schedule() 方法中做了两件事:
    1)遍历 Master.waitingDrivers 启动各 Driver;
    2)遍历 Master.waitingApps 为各 App 启动 Executors。

调用 startExecutorsOnWorkers 方法为 app 启动 Executors 的具体过程,后面文章分析。

3 总结

  1. 调用 TaskSchedulerImpl.start 方法启动 TaskScheduler 时会调用 SchedulerBackend.start 方法启动 SchedulerBackend;
  2. SchedulerBackend 是 TaskScheduler 的后台线程,用于接收处理一些发给 TaskScheduler 的消息;
  3. StandaloneSchedulerBackend 启动时调用其父类 CoarseGrainedSchedulerBackend 的 start 方法用于启动 CoarseGrainedSchedulerBackend
  4. 启动 CoarseGrainedSchedulerBackend 时会创建 DriverEndpoint 并注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext);
  5. DriverEndpoint.onStart 方法被调用,该方法中启一个新线程定时给自己发 ReviveOffers 消息,自己处理 ReviveOffers 消息时调用 makeOffers 方法,这一过程其实就是定时调度提交 Tasks 的过程
  6. 创建 StandaloneAppClient 并调用其 start 方法,给所有 Master 发送消息 RegisterApplication,准备注册 Application;
  7. Master 收到 RegisterApplication 消息后创建 ApplicationInfo 并放到 Master.waitingApps 中,表示 Application 已注册完,回响应给 Driver;
  8. 调用 schedule 方法启动 Application,schedule 方法内做两件事:
    1)遍历 Master.waitingDrivers 启动各 Driver
    2)遍历 Master.waitingApps 为各 App 启动 Executors
  9. 为 App 启动 Executors 过程后面文章分析。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 230,106评论 6 542
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,441评论 3 429
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 178,211评论 0 383
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,736评论 1 317
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,475评论 6 412
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,834评论 1 328
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,829评论 3 446
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 43,009评论 0 290
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,559评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,306评论 3 358
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,516评论 1 374
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 39,038评论 5 363
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,728评论 3 348
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 35,132评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,443评论 1 295
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,249评论 3 399
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,484评论 2 379