原创-Spark源码分析一:Standalone模式下Master启动源码分析

一:概述

  •   Master节点是Spark Standalone运行模式下的主节点,主要用于管理集群,负责资源的调度,其继承了ThreadSafeRpcEndpoint 、LeaderElectable两个类。
  •  ThreadSafeRpcEndpoint 类功能:线程安全的RpcEndpoint,可理解对消息有序处理,启动时默认先执行onStart方法,由receive和receiveAndReply方法处理接收到的所有消息,区别是一个无返回值,一个有返回值
  •  LeaderElectable类功能:实现master主节点选举
  •   代码片段如下:
private[deploy] class Master(  
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

其主要变量:

//周期检测work节点状态守护线程
  private val forwardMessageThread =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
  //保存注册的work节点信息
  val workers = new HashSet[WorkerInfo]
//保存workId和work对应关系
  private val idToWorker = new HashMap[String, WorkerInfo]
 //master节点状态
  private var state = RecoveryState.STANDBY
  //持久化对象
  private var persistenceEngine: PersistenceEngine = _
  //zookeeper高可用选举实现
  private var leaderElectionAgent: LeaderElectionAgent = _
 // Drivers currently spooled for scheduling
  //保存提交的、待执行任务
  private val waitingDrivers = new ArrayBuffer[DriverInfo]

二:Master启动过程

  • 启动一个Master是通过Shell命令启动了一个脚本start-master.sh开始的,这个脚本的启动流程如下

      start-master.sh  -> spark-daemon.sh start org.apache.spark.deploy.master.Master
    
  • 执行main方法,启动masterEndpoint消息服务,在启动服务过程中首先会执行onStart方法,进行一些必要的初始化动作

override def onStart(): Unit = {
    .....................
    省略部分,后面对关键实现有详解
   ...........................
 }

  在onstart方法中有几个重要的步骤:
1、周期性检查work节点状态

      checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

  如代码片段所示,每隔WORKER_TIMEOUT_MS秒(默认60*1000)给自身发送CheckForWorkerTimeOut消息检测work节点存活状态,紧接着消息会匹配到receive方法中的CheckForWorkerTimeOut 消息

case CheckForWorkerTimeOut =>
      timeOutDeadWorkers()

在timeOutDeadWorkers()方法中真正去检测处理work节点

private def timeOutDeadWorkers() {
    /** Check for, and remove, any timed-out workers */
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    //过滤心跳时间超过一分钟的work节点
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    for (worker <- toRemove) {
      //判断work节点的状态是否为DEAD
      if (worker.state != WorkerState.DEAD) {
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, WORKER_TIMEOUT_MS / 1000))
        //移除work节点
        removeWorker(worker)
      } else {
        //work节点状态处于DEAD,但works集合中还存在,则移除
        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

接着进入removeWorker方法

 private def removeWorker(worker: WorkerInfo) {
    logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
    //设置work状态为DEAD
    worker.setState(WorkerState.DEAD)
    //从workId和work映射集合中移除work
    idToWorker -= worker.id
    addressToWorker -= worker.endpoint.address
    if (reverseProxy) {
      webUi.removeProxyTargets(worker.id)
    }
    //更新移除work节点上executor状态
    for (exec <- worker.executors.values) {
      logInfo("Telling app of lost executor: " + exec.id)
      exec.application.driver.send(ExecutorUpdated(
        exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
      exec.state = ExecutorState.LOST
      exec.application.removeExecutor(exec)
    }
    //移除work节点上的driver,同时对未执行的driver重新分配work节点进行执行
    for (driver <- worker.drivers.values) {
      if (driver.desc.supervise) {
        logInfo(s"Re-launching ${driver.id}")
       //重新执行driver
        relaunchDriver(driver)
      } else {
        logInfo(s"Not re-launching ${driver.id} because it was not supervised")
        removeDriver(driver.id, DriverState.ERROR, None)
      }
    }
    //移除持久化的work节点信息,如删除zookeeper上work节点信息
    persistenceEngine.removeWorker(worker)
  }

2、根据spark.deploy.recoveryMode属性配置的高可用模式,创建对应实现

    /**
      * 1、根据spark.deploy.recoveryMode属性,配置高可用模式,匹配不同的模式,创建不同实现
      * 2、创建ZooKeeperLeaderElectionAgent对象,进行master节点选举
      */
    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
  }

以zookeeper作高可用为例:
   由ZooKeeperRecoveryModeFactory创建ZooKeeperPersistenceEngine对应操作zookeeper的实现,如addWorker、removeWorker、readPersistedData等方法
3、创建LeaderElectionAgent对象,进行master主节点选举
以zookeeper作高可用为例:
   由ZooKeeperRecoveryModeFactory创建ZooKeeperLeaderElectionAgent对象,默认调用该类中的start方法开始进行master主节点选举

....略.....
  start()
  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    leaderLatch.addListener(this)
    leaderLatch.start()
  }

如被选举成leader节点会调用isLeader()方法,如和zookeeper连接断开丢失leader,会调用notLeader()方法

override def isLeader() {
    synchronized {
      // could have lost leadership by now.
      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.
      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

如上代码片段可看到都会进一步调用updateLeadershipStatus()方法

private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
// 如是leader则调用Master中的electedLeader
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
// 如不是leader则调用Master中的revokedLeadership
      masterInstance.revokedLeadership()
    }
  }

接着查看Master类中的revokedLeadership()方法,即给自身发送了RevokedLeadership消息,

override def revokedLeadership() {
    self.send(RevokedLeadership)
  }

而对RevokedLeadership消息处理比较简单,即直接退出

case RevokedLeadership =>
      logError("Leadership has been revoked -- master shutting down.")
      System.exit(0)

同样查看Master类中的electedLeader()方法,也是给自身发送了ElectedLeader消息

override def electedLeader() {
    self.send(ElectedLeader)
  }

后续接着分析,笔者水平有限,如有误欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容