一:概述
- Master节点是Spark Standalone运行模式下的主节点,主要用于管理集群,负责资源的调度,其继承了ThreadSafeRpcEndpoint 、LeaderElectable两个类。
- ThreadSafeRpcEndpoint 类功能:线程安全的RpcEndpoint,可理解对消息有序处理,启动时默认先执行onStart方法,由receive和receiveAndReply方法处理接收到的所有消息,区别是一个无返回值,一个有返回值
- LeaderElectable类功能:实现master主节点选举
- 代码片段如下:
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
其主要变量:
//周期检测work节点状态守护线程
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
//保存注册的work节点信息
val workers = new HashSet[WorkerInfo]
//保存workId和work对应关系
private val idToWorker = new HashMap[String, WorkerInfo]
//master节点状态
private var state = RecoveryState.STANDBY
//持久化对象
private var persistenceEngine: PersistenceEngine = _
//zookeeper高可用选举实现
private var leaderElectionAgent: LeaderElectionAgent = _
// Drivers currently spooled for scheduling
//保存提交的、待执行任务
private val waitingDrivers = new ArrayBuffer[DriverInfo]
二:Master启动过程
-
启动一个Master是通过Shell命令启动了一个脚本start-master.sh开始的,这个脚本的启动流程如下
start-master.sh -> spark-daemon.sh start org.apache.spark.deploy.master.Master
执行main方法,启动masterEndpoint消息服务,在启动服务过程中首先会执行onStart方法,进行一些必要的初始化动作
override def onStart(): Unit = {
.....................
省略部分,后面对关键实现有详解
...........................
}
在onstart方法中有几个重要的步骤:
1、周期性检查work节点状态
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
如代码片段所示,每隔WORKER_TIMEOUT_MS秒(默认60*1000)给自身发送CheckForWorkerTimeOut消息检测work节点存活状态,紧接着消息会匹配到receive方法中的CheckForWorkerTimeOut 消息
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
在timeOutDeadWorkers()方法中真正去检测处理work节点
private def timeOutDeadWorkers() {
/** Check for, and remove, any timed-out workers */
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
//过滤心跳时间超过一分钟的work节点
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
for (worker <- toRemove) {
//判断work节点的状态是否为DEAD
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT_MS / 1000))
//移除work节点
removeWorker(worker)
} else {
//work节点状态处于DEAD,但works集合中还存在,则移除
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
接着进入removeWorker方法
private def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
//设置work状态为DEAD
worker.setState(WorkerState.DEAD)
//从workId和work映射集合中移除work
idToWorker -= worker.id
addressToWorker -= worker.endpoint.address
if (reverseProxy) {
webUi.removeProxyTargets(worker.id)
}
//更新移除work节点上executor状态
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
//移除work节点上的driver,同时对未执行的driver重新分配work节点进行执行
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
//重新执行driver
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
//移除持久化的work节点信息,如删除zookeeper上work节点信息
persistenceEngine.removeWorker(worker)
}
2、根据spark.deploy.recoveryMode属性配置的高可用模式,创建对应实现
/**
* 1、根据spark.deploy.recoveryMode属性,配置高可用模式,匹配不同的模式,创建不同实现
* 2、创建ZooKeeperLeaderElectionAgent对象,进行master节点选举
*/
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
以zookeeper作高可用为例:
由ZooKeeperRecoveryModeFactory创建ZooKeeperPersistenceEngine对应操作zookeeper的实现,如addWorker、removeWorker、readPersistedData等方法
3、创建LeaderElectionAgent对象,进行master主节点选举
以zookeeper作高可用为例:
由ZooKeeperRecoveryModeFactory创建ZooKeeperLeaderElectionAgent对象,默认调用该类中的start方法开始进行master主节点选举
....略.....
start()
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
如被选举成leader节点会调用isLeader()方法,如和zookeeper连接断开丢失leader,会调用notLeader()方法
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
如上代码片段可看到都会进一步调用updateLeadershipStatus()方法
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
// 如是leader则调用Master中的electedLeader
masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
// 如不是leader则调用Master中的revokedLeadership
masterInstance.revokedLeadership()
}
}
接着查看Master类中的revokedLeadership()方法,即给自身发送了RevokedLeadership消息,
override def revokedLeadership() {
self.send(RevokedLeadership)
}
而对RevokedLeadership消息处理比较简单,即直接退出
case RevokedLeadership =>
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
同样查看Master类中的electedLeader()方法,也是给自身发送了ElectedLeader消息
override def electedLeader() {
self.send(ElectedLeader)
}
后续接着分析,笔者水平有限,如有误欢迎指正