[第六章]解析Driver,Executor状态改变后通知Master

本小节主要是继续上一节的内容进行延续,上一节我们讲了Master的注册机制,包括Driver,Application,Worker对Master的注册,这样在Master端就很清楚的知道Driver,Application,Worker的启动状态。
在任务运行中,当Driver,Application(Executor),Worker状态就化时,其实也同时会更新在Master端的注册信息。

下面是当Driver状态改变时

case DriverStateChanged(driverId, state, exception) => {
      state match {
        //当Driverr是这些状态,则移除Driver
        case DriverState.ERROR | 
DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received
 unexpected state update for driver $driverId: $state")
      }
    }


def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
    //通过driverId找到对应的driver对象
    drivers.find(d => d.id == driverId) match {
      //Some是样例类,当已经找到
    case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //driver从内存缓存区中删除
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        //把driver加入已完成的completeDrivers列表中
        completedDrivers += driver
        //使用持久化引挚去年driver
        persistenceEngine.removeDriver(driver)
        //设定此driver状态等
        driver.state = finalState
        driver.exception = exception
        //通过此driver找到对应的worker,移除所有worker中的此driver的信息
        driver.worker.foreach(w => w.removeDriver(driver))
        //调用 了schedule方法,调度
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

通过上面的方法,我们是不是看的很熟悉,这是我们在前面注册Master时,要执行的步骤的反方向.

下面是Executor状态改变时的源码:代码很简单,我都写了注解,这个与前面的Appliction注册Master步骤的反方面很类似。

case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      //通过executorID找到applicaiton里对应的executor信息
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => {
          val appInfo = idToApp(appId)
          exec.state = state
          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
          //通知executor对应的applicaiton的Driver。更新状态
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          //判断executor完成了
          if (ExecutorState.isFinished(state)) {
            // Remove this executor from the worker and app
            logInfo(s"Removing executor ${exec.fullId} because it is $state")
            //移除application中的executor的对应的信息
            appInfo.removeExecutor(exec)
            //删除executor的worker中的executor的信息
            exec.worker.removeExecutor(exec)

            val normalExit = exitStatus == Some(0)
            // Only retry certain number of times so we don't go into an infinite loop.
            if (!normalExit) {
              //当这些executor是非正常退出 ,当重试次数小于10时,会在调度
              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
                schedule()
              } else {
                //当大于10后,删除executor后,
                val execs = appInfo.executors.values
                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                    s"${appInfo.retryCount} times; removing it")
    //同时把executor上的task对应的application也删除了
                  removeApplication(appInfo, ApplicationState.FAILED)
                }
              }

后面的代码简单说明一下,当重试executor次数大于10后,删除了execuor,当然在executor上运行的application也就不存在了.

文章每一个字都是作者写出来的,看完若感觉有用,请点‘喜欢’

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容