spark源码阅读之executor模块②

本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

spark源码阅读之executor模块①中,AppClient已经实例化完成,且注册了名为ClientEndpoint的通信端,调用其onStart方法,在其中又调用了registerWithMaster方法向Master注册App,本文将详细剖析如何注册App,注册完之后又是如何分配加载Executor和相关资源的。

向Master注册app

registerWithMaster(1)的参数传入整型数字1,表明这是第一次向Master注册,程序会周期性尝试向Master注册app,直到收到Master返回已经注册成功的信息,或者尝试达到最大次数而失败,以下是源码实现:

/**
 * Register with all masters asynchronously. It will call `registerWithMaster` every
 * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
 * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
 *
 * nthRetry means this is the nth attempt to register with master.
  *
  * 异步的向所有master发起注册请求,每隔REGISTRATION_TIMEOUT_SECONDS周期将会重新尝试注册
  * 直到达到最大重试次数REGISTRATION_RETRIES
  * 一旦成功连上了某台master,所有的调度工作和异步请求句柄将会被取消
  *
  * nthRetry代表重复调用自身注册的次数
 */
private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = {
      if (registered.get) {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerMasterThreadPool.shutdownNow()
      } else if (nthRetry >= REGISTRATION_RETRIES) {
        markDead("All masters are unresponsive! Giving up.")
      } else {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerWithMaster(nthRetry + 1)
      }
    }
  }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}

接下来我们看其中异步请求的方法tryRegisterAllMasters,它返回的是一个Futures数组,表明它是不堵塞线程的,哪个线程先拿到注册的回应都可以,那么其他的Future句柄就会被取消掉,在tryRegisterAllMasters方法中,通过sparkUrl拿到Master的地址,在这里注册了Master的EndpointRef,发送一条RegisterApplication消息,其中封装了AppDescription,以下是源码:

/**
 *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
 */
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress <- masterRpcAddresses) yield {
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = try {
        if (registered.get) {
          return
        }
        logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
        val masterRef =
          rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
        masterRef.send(RegisterApplication(appDescription, self))   //向Master发起注册Application的请求
      } catch {
        case ie: InterruptedException => // Cancelled
        case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
      }
    })
  }
}
Master的回应

Master在收到AppClient的RegisterApplication请求后,首先如果是standby master则不做响应,如果是active的,那么它会创建一个ApplicationInfo实例将传过来的app信息封装,然后注册App,持久化注册的App,然后给AppClient一个回应,让它别再请求了,最后会为新注册的App调度资源,以下是源码:

case RegisterApplication(description, driver) => {    //从appClient接收到注册app的请求
  // TODO Prevent repeated registrations from some driver
  if (state == RecoveryState.STANDBY) {
    // ignore, don't send response
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, driver)    //创建ApplicationInfo
    registerApplication(app)    //注册app
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)   //持久化
    driver.send(RegisteredApplication(app.id, self))    //向AppClient发出注册响应信息
    schedule()    //重新调度资源:每次有新的app或者加入新的资源时都会调用
  }
}

接下来我们通过registerApplication方法来分析Master在注册App的过程中做了些什么:

private def registerApplication(app: ApplicationInfo): Unit = {
  val appAddress = app.driver.address
  if (addressToApp.contains(appAddress)) {  //如果是已有的ClientEndpoint地址,则说明已经注册过了
    logInfo("Attempted to re-register application at same address: " + appAddress)
    return
  }
  //Master将App添加到自己维护的数据结构中
  applicationMetricsSystem.registerSource(app.appSource)
  apps += app
  idToApp(app.id) = app
  endpointToApp(app.driver) = app
  addressToApp(appAddress) = app
  waitingApps += app
}

Master端注册App的过程其实就是将App维护到自己的成员变量中。

接着,Master将注册的App元数据信息持久化,持久化引擎有两种:依赖于Zookeeper,或者直接落地到FileSystem,用户也可以自定义持久化的方式,一般生产环境会托管给Zookeeper管理。

持久化之后,Master向AppClient发出RegisteredApplication的响应,表明App已注册,AppClient在收到响应后,也会去更新它维护的一些数据结构,然后取消所有的注册请求,以下是源码:

case RegisteredApplication(appId_, masterRef) =>
  // FIXME How to handle the following cases?
  // 1. A master receives multiple registrations and sends back multiple
  // RegisteredApplications due to an unstable network.
  // 2. Receive multiple RegisteredApplication from different masters because the master is
  // changing.
  appId.set(appId_)
  registered.set(true)
  master = Some(masterRef)
  listener.connected(appId.get)

Master对于注册的App所做的最后一件事情就是,重新调度资源,除了新加入App的情况外,资源本身有变动,如新增一台Worker,也会调用schedule方法重新调度资源,以下是其源码解析:

/**
 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
 */
private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) {
    return    //ALIVE表明Master是主Master,且已经COMPLETING_RECOVERY
  }
  // Drivers take strict precedence over executors
  // 筛选出Alive的workers并将它们打散
  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  val numWorkersAlive = shuffledAliveWorkers.size
  var curPos = 0
  for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
    // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
    // start from the last worker that was assigned a driver, and continue onwards until we have
    // explored all alive workers.
    var launched = false
    var numWorkersVisited = 0
    while (numWorkersVisited < numWorkersAlive && !launched) {
      val worker = shuffledAliveWorkers(curPos)
      numWorkersVisited += 1    //已经访问过的Worker
      // 如果这个worker的剩余内存和cores满足driver的需求
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)  //在这个worker上加载driver
        waitingDrivers -= driver    //清除缓存
        launched = true   //修改标志退出循环
      }
      curPos = (curPos + 1) % numWorkersAlive   //确保访问指针的散列性
    }
  }
  startExecutorsOnWorkers()   //开始在workers上加载executors
}

可以观察到,这个方法先是筛选出合适的worker,然后在其中一个上面调用launchDriver方法加载了driver,最后调用startExecutorsOnWorkers方法分配executors,至此终于进入正题:driver和executor的资源分配与加载,所以阅读源码是一件非常需要耐心的事情。

driver和executor的资源分配

首先来看launchDriver方法

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver)
  driver.worker = Some(worker)    //维护worker和driver的关系
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))  //向worker发送LaunchDriver的请求
  driver.state = DriverState.RUNNING  //将Driver的状态置为RUNNING
}

launchDriver方法中维护了worker和driver的关系,并向worker端发送了LaunchDriver的请求,我们去worker端看它收到LaunchDriver请求的动作:

case LaunchDriver(driverId, driverDesc) => {
  logInfo(s"Asked to launch driver $driverId")
  val driver = new DriverRunner(    //创建一个DriverRunner
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)
  drivers(driverId) = driver
  driver.start()   //调用start方法启动driver
  coresUsed += driverDesc.cores   //driver消耗的cpu和内存
  memoryUsed += driverDesc.mem
}

可以看出程序创建了一个DriverRunner对象,然后调用其start方法启动线程,在start方法中下载并提交了额外的jar包,封装之后开启线程,之后向worker发送一条DriverStateChanged的消息。

接下来展开分析startExecutorsOnWorkers方法,在workers上调度加载executors

/**
 * Schedule and launch executors on workers
  * 在worker上调度加载executors
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  // 对app的调度是一个简单FIFO的队列
  for (app <- waitingApps if app.coresLeft > 0) {   //对于队列中的每一个app
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
    // Filter out workers that don't have enough resources to launch an executor
    // 满足条件的worker:stat是ALIVE,内存和cpu满足app的对于一个executor的需求
    val usableWorkers: Array[WorkerInfo] = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
    // 记录对应usableWorkers中每个worker可以分配的cores
    val assignedCores: Array[Int] = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    // Now that we've decided how many cores to allocate on each worker, let's allocate them
    // 这里开始为worker上的executors分配资源
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

该方法中显示从一个简单FIFO的app队列中取出一个app,app中存有每个executor需要的内存和cpu,再筛选出合适的workers,合适的workers是指:它首先是活着的,其次它的剩余内存和cpu满足该app每个executor需要的内存和cpu,筛选出这些满足条件的workers然后按照空闲cpu倒序排列。

接下来scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)这个方法是在筛选出的workers中分配executors。

入参中有个spreadOutApps参数比较特别,是用来区分两种不同的分配策略,字面上的意思是分散分配apps,所以:

  1. 当spreadOutApps为true时,尽量分散分配executors在更多的workers上,程序默认是这种模式。
  2. 当spreadOutApps为false时,则在更少的workers上分配executors,适用于cpu密集型且内存占用较少的应用。

下面我们点入scheduleExecutorsOnWorkers来看其实现

private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
  val coresPerExecutor = app.desc.coresPerExecutor  //取出app中的cores分配需求coresPerExecutor
  val minCoresPerExecutor = coresPerExecutor.getOrElse(1) //最少cpu取coresPerExecutor,如果不存在则是1
  val oneExecutorPerWorker: Boolean = coresPerExecutor.isEmpty  //coresPerExecutor为空则oneExecutorPerWorker为tr
  val memoryPerExecutor = app.desc.memoryPerExecutorMB  //取出app中内存分配需求memoryPerExecutor
  val numUsable = usableWorkers.length  //待分配workers个数
  val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
  val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
  var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)   //可以分配的cores取app需求的cores
  /** Return whether the specified worker can launch an executor for this app. */
  def canLaunchExecutor(pos: Int): Boolean = {
    // 条件1:需要继续分配的cores大于等于每个executor需求的cores,最少为1core
    val keepScheduling = coresToAssign >= minCoresPerExecutor
    // 条件2:每个worker上剩余的cores是否大于minCoresPerExecutor
    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    // If we allow multiple executors per worker, then we can always launch new executors.
    // Otherwise, if there is already an executor on this worker, just give it more cores.
    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
    // 如果coresPerExecutor不是空的,也就是用户通过参数定义了,或者已分配的executor为0
    if (launchingNewExecutor) {
      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
      // 条件3:每个worker上剩余的内存是否大于等于每个executor需要的内存
      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
      // 条件4:已分配的executor总和加上app需要的executor数量,是否小于app的可以分配的executor限制
      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
      // 条件1 2 3 4都满足则返回true,否则返回false
      keepScheduling && enoughCores && enoughMemory && underLimit
    } else {    //如果是需要给已存在的executor添加cores
      // We're adding cores to an existing executor, so no need
      // to check memory and executor limits
      keepScheduling && enoughCores //仅需要满足持续调度和足够cores两个条件即可
    }
  }
  // Keep launching executors until no more workers can accommodate any
  // more executors, or if we have reached this application's limits
  var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
  while (freeWorkers.nonEmpty) {
    freeWorkers.foreach { pos =>
      var keepScheduling = true
      while (keepScheduling && canLaunchExecutor(pos)) {
        coresToAssign -= minCoresPerExecutor
        assignedCores(pos) += minCoresPerExecutor
        // If we are launching one executor per worker, then every iteration assigns 1 core
        // to the executor. Otherwise, every iteration assigns cores to a new executor.
        if (oneExecutorPerWorker) {
          assignedExecutors(pos) = 1
        } else {
          assignedExecutors(pos) += 1
        }
        // Spreading out an application means spreading out its executors across as
        // many workers as possible. If we are not spreading out, then we should keep
        // scheduling executors on this worker until we use all of its resources.
        // Otherwise, just move on to the next worker.
        if (spreadOutApps) {
          keepScheduling = false  //退出这个循环,去找下一个worker继续分配executors
        }
      }
    }
    freeWorkers = freeWorkers.filter(canLaunchExecutor)
  }
  assignedCores
}

代码段较长,其中我也做了很详细的注释,这里就只说一下workers可以继续分配executors的条件:
条件1(keepScheduling):如果需要继续分配的cores数量(取值于app还需要的cores和workers还可提供的cores的最小值)大于等于每个executor需求的cores,则为true,满足持续调度条件;
条件2(enoughCores):如果workers队列中存有一个worker它剩余的cores满足每个executor需求的cores,则为true,说明还有足够的cores;
条件3(enoughMemory):如果workers队列中存有一个worker剩余的内存满足每个executor需要的内存,则为true,说明还有足够的内存;
条件4(underLimit):这次调度需要分配的executors数量+已分配的executors数量小于app的executors总数限制,则为true,app在初始化分配的时候默认没有限制,而当后续分配的时候这个限制会根据情况动态变化。

以上4个条件,当分配新的executor的时候需要全部满足,如果是给现有的executor增加cores,仅满足条件1和2即可。

executor的加载

当资源分配完成之后,开始加载executors了,在allocateWorkerResourceToExecutors方法中封装了worker和它对应的executor以及相关资源为一个ExecutorDesc对象,并调用launchExecutor方法加载Executor,源码如下:

private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
    val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)
    // Master调用launchExecutor方法来向worker发送请求,同时会更新Master保存的Worker的信息
    // 这些资源信息并不是Worker主动上报到Master的,而是Master主动维护的,Master不会等到Worker上成功启动Executor再来更新Worker信息
    // 如果Worker启动Executor启动失败,那么它会发送FAILED消息给Master
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING
  }
}

在launchExecutor方法中,Master向Worker发送了LaunchExecutor消息,Worker端收到相关信息之后开始按照Master的分配调度来加载executors

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)    //更新Worker的信息
  worker.endpoint.send(LaunchExecutor(masterUrl,    //向Worker发送LaunchExecutor请求,Worker接到请求后就会开始加载executors
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(   //向appClient发送executor已经添加的信息
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

在以上方法中,Master还向AppClient端发送了ExecutorAdded的消息。

我们首先来看Worker端收到LaunchExecutor消息后的动作:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>    //从Master收到请求加载executors
  if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
      // Create the executor's working directory
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }
      // Create local dirs for the executor. These are passed to the executor via the
      // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
      // application finishes.
      val appLocalDirs = appDirectories.get(appId).getOrElse {
        Utils.getOrCreateLocalRootDirs(conf).map { dir =>
          val appDir = Utils.createDirectory(dir, namePrefix = "executor")
          Utils.chmod700(appDir)
          appDir.getAbsolutePath()
        }.toSeq
      }
      appDirectories(appId) = appLocalDirs
      val manager = new ExecutorRunner(   //创建一个ExecutorRunner
        appId,
        execId,
        appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
        cores_,
        memory_,
        self,
        workerId,
        host,
        webUi.boundPort,
        publicAddress,
        sparkHome,
        executorDir,
        workerUri,
        conf,
        appLocalDirs, ExecutorState.RUNNING)
      executors(appId + "/" + execId) = manager
      manager.start()   //启动线程
      coresUsed += cores_
      memoryUsed += memory_
      sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))    //告知Master executor的状态改变了
    } catch {
      case e: Exception => {
        logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
        if (executors.contains(appId + "/" + execId)) {
          executors(appId + "/" + execId).kill()
          executors -= appId + "/" + execId
        }
        sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
          Some(e.toString), None))
      }
    }
  }

其中主要创建了ExecutorRunner,其中封装了executor加载需要的信息,调用其start方法启动,在start方法中创建了一个线程,并调用fetchAndRunExecutor方法

private[worker] def start() {
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }    //调用fetchAndRunExecutor向driver注册executor
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
    // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
    if (state == ExecutorState.RUNNING) {
      state = ExecutorState.FAILED
    }
    killProcess(Some("Worker shutting down")) }
}

在fetchAndRunExecutor方法中创建了一盒ProcessBuilder对象,然后封装了一些参数,最后通过执行命令的方式启动了CoarseGrainedExecutorBackend,先看源码:

private def fetchAndRunExecutor() {
  try {
    // Launch the process
    // 拼接linux命令用来启动CoarseGrainedExecutorBackend
    val builder: ProcessBuilder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
      memory, sparkHome.getAbsolutePath, substituteVariables)
    val command = builder.command()
    val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
    logInfo(s"Launch command: $formattedCommand")
    builder.directory(executorDir)
    builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
    // In case we are running this from within the Spark Shell, avoid creating a "scala"
    // parent process for the executor command
    builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    // Add webUI log urls
    val baseUrl =
      s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
    builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
    builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    process = builder.start()   //使用ProcessBuilder执行linux命令,启动CoarseGrainedExecutorBackend
    val header = "Spark Executor Command: %s\n%s\n\n".format(
      formattedCommand, "=" * 40)
    // Redirect its stdout and stderr to files
    // 重定向stdout和stderr
    val stdout = new File(executorDir, "stdout")
    stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
    val stderr = new File(executorDir, "stderr")
    Files.write(header, stderr, UTF_8)
    stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
    // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
    // or with nonzero exit code
    // 等待进程退出
    val exitCode = process.waitFor()
    state = ExecutorState.EXITED
    val message = "Command exited with code " + exitCode
    // 向worker发送ExecutorStateChanged的消息
    worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
  } catch {
    case interrupted: InterruptedException => {
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    }
    case e: Exception => {
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
    }
  }
}

这里有个问题我绕了好久才搞清楚,就是为什么执行的linux命令是启动了CoarseGrainedExecutorBackend进程,从这段代码完全看不出任何端倪,这需要追溯到SparkDeploySchedulerBackend创建AppClient对象的时候,封装ApplicationDescription对象时,其中有一个参数是command,这个command指定了创建CoarseGrainedExecutorBackend,创建AppClient这部分过程在spark源码阅读之executor模块①中分析过,我这里把关键的代码贴出来:

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
  args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
  command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()

Linux命令通过CoarseGrainedExecutorBackend的main方法来启动进程,main方法中解析了命令中传入的一些启动参数,然后调用run方法启动:

def main(args: Array[String]) {
  var driverUrl: String = null
  var executorId: String = null
  var hostname: String = null
  var cores: Int = 0
  var appId: String = null
  var workerUrl: Option[String] = None
  val userClassPath = new mutable.ListBuffer[URL]()
  var argv = args.toList
  while (!argv.isEmpty) {
    argv match {
      case ("--driver-url") :: value :: tail =>
        driverUrl = value
        argv = tail
      case ("--executor-id") :: value :: tail =>
        executorId = value
        argv = tail
      case ("--hostname") :: value :: tail =>
        hostname = value
        argv = tail
      case ("--cores") :: value :: tail =>
        cores = value.toInt
        argv = tail
      case ("--app-id") :: value :: tail =>
        appId = value
        argv = tail
      case ("--worker-url") :: value :: tail =>
        // Worker url is used in spark standalone mode to enforce fate-sharing with worker
        workerUrl = Some(value)
        argv = tail
      case ("--user-class-path") :: value :: tail =>
        userClassPath += new URL(value)
        argv = tail
      case Nil =>
      case tail =>
        // scalastyle:off println
        System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
        // scalastyle:on println
        printUsageAndExit()
    }
  }
  if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
    appId == null) {
    printUsageAndExit()
  }
  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

run方法中比较关键的代码段是,注册了名为”Executor“的通信端,这样就调用了生命周期的onStart方法,由于篇幅较长,我把关键代码截选出来:

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))

在onStart方法中,向DriverEndpoint发起了RegisterExecutor请求,注册Executor,DriverEndpoint的创建过程在spark源码阅读之executor模块①中有说明,以下为onstart源码:

override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)  //DriverEndpoint的引用
    ref.ask[RegisterExecutorResponse](
      RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))  //向driver注册executor
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) => Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
    }
    case Failure(e) => {
      logError(s"Cannot register with driver: $driverUrl", e)
      System.exit(1)
    }
  }(ThreadUtils.sameThread)
}

DriverEndpoint在收到executor的注册请求后,会创建ExecutorData对象封装executor的信息,然后把executor注册到其数据结构中,最后调用makeOffers()方法给注册的executor分配Task,这里就和spark源码阅读之executor模块①中最后一节内容衔接上了,分配Task的内容我们在下一篇文章中展开。以下是RegisterExecutor的源码:

  case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>   //driver在接收到RegisterExecutor请求之后
    if (executorDataMap.contains(executorId)) { //如果已存在该executorId
      context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    } else {
      // If the executor's rpc env is not listening for incoming connections, `hostPort`
      // will be null, and the client connection should be used to contact the executor.
      val executorAddress = if (executorRef.address != null) {
          executorRef.address
        } else {
          context.senderAddress
        }
      logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
      // 维护本身创建的一些数据结构
      addressToExecutorId(executorAddress) = executorId
      totalCoreCount.addAndGet(cores)
      totalRegisteredExecutors.addAndGet(1)
      val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
        cores, cores, logUrls)    //创建一个ExecuterData,把executor的一些信息封装进去
      // This must be synchronized because variables mutated
      // in this block are read when requesting executors
      CoarseGrainedSchedulerBackend.this.synchronized {
        executorDataMap.put(executorId, data)
        if (numPendingExecutors > 0) {
          numPendingExecutors -= 1
          logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
        }
      }
      // Note: some tests expect the reply to come after we put the executor in the map
      // 将executor的注册信息放入executorDataMap后,回复executor已注册完成
      context.reply(RegisteredExecutor(executorAddress.host))
      listenerBus.post(
        SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
      makeOffers()    //开始给注册的executor分配Task
    }

接下来我们看一下executor端收到已注册完成消息之后的动作:

case RegisteredExecutor(hostname) =>
  logInfo("Successfully registered with driver")
  executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

CoarseGrainedExecutorBackend收到DriverEndpoint的RegisteredExecutor消息之后,创建了executor实例,至此exector创建完成,接下来需要考虑的是如何给executor分配Task并执行,将放在下一篇文章中展开。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容