本小节主要是继续上一节的内容进行延续,上一节我们讲了Master的注册机制,包括Driver,Application,Worker对Master的注册,这样在Master端就很清楚的知道Driver,Application,Worker的启动状态。
在任务运行中,当Driver,Application(Executor),Worker状态就化时,其实也同时会更新在Master端的注册信息。
下面是当Driver状态改变时
case DriverStateChanged(driverId, state, exception) => {
state match {
//当Driverr是这些状态,则移除Driver
case DriverState.ERROR |
DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received
unexpected state update for driver $driverId: $state")
}
}
def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
//通过driverId找到对应的driver对象
drivers.find(d => d.id == driverId) match {
//Some是样例类,当已经找到
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
//driver从内存缓存区中删除
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
//把driver加入已完成的completeDrivers列表中
completedDrivers += driver
//使用持久化引挚去年driver
persistenceEngine.removeDriver(driver)
//设定此driver状态等
driver.state = finalState
driver.exception = exception
//通过此driver找到对应的worker,移除所有worker中的此driver的信息
driver.worker.foreach(w => w.removeDriver(driver))
//调用 了schedule方法,调度
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
通过上面的方法,我们是不是看的很熟悉,这是我们在前面注册Master时,要执行的步骤的反方向.
下面是Executor状态改变时的源码:代码很简单,我都写了注解,这个与前面的Appliction注册Master步骤的反方面很类似。
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
//通过executorID找到applicaiton里对应的executor信息
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
//通知executor对应的applicaiton的Driver。更新状态
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
//判断executor完成了
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
//移除application中的executor的对应的信息
appInfo.removeExecutor(exec)
//删除executor的worker中的executor的信息
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
if (!normalExit) {
//当这些executor是非正常退出 ,当重试次数小于10时,会在调度
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
//当大于10后,删除executor后,
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
//同时把executor上的task对应的application也删除了
removeApplication(appInfo, ApplicationState.FAILED)
}
}
后面的代码简单说明一下,当重试executor次数大于10后,删除了execuor,当然在executor上运行的application也就不存在了.
文章每一个字都是作者写出来的,看完若感觉有用,请点‘喜欢’