状态机管理
在YARN中,如果一个对象由若干个状态以及触发这些状态发生转移的事件构成,它将被抽象成一个状态机,在YARN ResourceManager内部,共有4类状态机,分别是RMApp、RMAppAttempt、RMContainer和RMNode。其中,前2类状态机维护了一个应用程序相关的生命周期,包括Application生命周期、一次运行尝试的生命周期;RMContainer则维护了分配出去的各个资源的使用状态;RMNode维护了一个NodeManager(一个节点上可以有多个NodeManager)的生命周期。
YARN中的Application生命周期由状态机RMAppImpl维护,每个Application可能会尝试运行多次,每次称为一次“运行尝试”(Application Attempt,也可称为运行实例),而每次运行尝试的生命周期则由状态机RMAppAttemptImpl维护,如果一次运行尝试(实例)运行失败,RMApp会创建另外一个运行尝试,直到某次运行尝试运行成功或者达到运行尝试上限。对于每次运行尝试,ResourceManager将为它分配一个Container,Container是运行环境的抽象,内部封装了任务的运行环境和资源等信息,而一个应用程序的ApplicationMaster就运行在这个Container中。ApplicationMaster启动之后,会不断向ResourceManager申请Container以运行各类任务。Container的生命周期由状态机RMContainerImpl维护,整个组织结构可参照图5-5。
Application Attempt的生命周期与ApplicationMaster的生命周期基本上是一致的:一个Application内部所有任务均由ApplicationMaster维护和管理,ApplicationMaster本身需要占用一个Container,而这个Container由ResourceManager为其申请和启动。一旦ApplicationMaster成功启动,它就会与ResourceManager通信,为它内部的任务申请Container。如果ApplicationMaster重新启动,则意味着一个新的Application Attempt被启动,换句话说,一个Application Attempt的“生死存亡”与ApplicationMaster的“命运”紧紧绑定在一起。
RMApp状态机
RMApp是ResourceManager中用于维护一个Application生命周期的数据结构,它的实现是RMAppImpl类,该类维护了一个Application状态机,记录了一个Application可能存在的各个状态(RMAppState)以及导致状态间转换的事件(RMAppEvent)。当某个RMAppEventType类型的事件发生时,RMAppImpl会根据实际情况进行状态转移,同时触发一个行为(实际是一个回调函数)。除了维护Application状态机外,RMAppImpl还保存了Application基本信息(比如名称、所在队列名称、启动时间等)和迄今为止所有的运行尝试(Application Attempt)信息。
如图5-6所示,在RM看来,每个Application有9种基本状态(RMAppState)和12种导致这9种状态之间发生转移的事件(RMAppEventType),RMAppImpl的作用是等待接收其他对象发出的RMAppEventType类型的事件,然后根据当前状态和事件类型,将当前状态转移到另外一种状态,同时触发一种行为(实际就是执行一个函数,该函数可能会再次发出某种类型的事件),下面具体进行介绍。
(1)基本状态
基本状态有以下几种:
NEW:状态机初始状态。每个Application对应一个状态机,而每个状态机的初始状态为NEW。
NEW_SAVING:日志记录应用程序基本信息时所处的状态。ResourceManager收到用户提交的应用程序后,将为它创建一个RMAppImpl对象以维护它的状态,之后要做的第一件事是记录该应用程序的基本信息,以便故障重启后可以自动恢复运行该应用程序。
SUBMITTED:应用程序已提交状态。客户端通过RPC函数ApplicationClientProtocol#submitApplication向RM提交一个Application,通过合法性验证以及完成日志记录后,RM会创建了一个RMAppAttemptImpl对象,以进行第一次运行尝试,并将Application(运行)状态置为SUBMITTED。
ACCEPTED:资源调度器同意接受该应用程序后所处状态。应用程序不仅要在ClientRMService中进行合法性检查,也要经资源调度器的合法性检查,比如Capacity Scheduler允许管理员配置应用程序提交数目上限,如果超过该上限,则拒绝接受新提交的应用程序。
RUNNING:该应用程序的ApplicationMaster已经成功在某个节点上开始运行,这意味着该应用程序的RMAppAttemptImpl也已经处于运行状态中
FAILED:该Application的ApplictionMaster运行失败时所处的状态。多种原因可能导致ApplicationMaster运行失败,比如硬件故障、软件bug、OOM(Out Of Memory)等。需要注意的是,状态机收到ATTEMPT_FAILED事件后不一定会立即转入FAILED状态,而是先检查失败次数是否超过用户设置的最大上限(如果用户未设置该值,则采用ResourceManager中参数yarn.resourcemanager.am.max-attempts设置的全局最大值,默认是2),如果没有,则再次创建一个RMAppAttemptImpl对象,并让状态机重新回到SUBMITTED状态,否则才最终会转入FAILED状态,这意味着应用程序彻底运行失败。
KILLED:该Application被杀死时所处的状态,通常是由于收到来自客户端杀死应用程序命令后,ResourceManager主动将ApplicationMaster杀死。
FINISHING:当Application Master通过RPC函数ApplicationMasterProtocol#finishApplicationMaster通知RM,自己运行结束将要退出时,Application将被置为FINISHING状态。
FINISHED:NodeManager通过心跳汇报ApplicationMaster所在的Container运行结束,RMAppImpl将被置为FINISHED状态,这意味着该应用程序成功运行结束。
(2)基本事件
基本事件主要包括:
STARTED:客户端调用RPC函数ApplicationClientProtocol#submitApplication提交应用程序后,会触发STARTED事件
RECOVER:如果管理员开启了应用程序恢复功能(默认不开启,可通过参数yarn.resourcemanager.recovery.enabled配置),则ResourceManager重启后,会向已提交但尚未开始运行的应用程序发送RECOVER事件。
KILL:客户端调用RPC函数ApplicationClientProtocol#forceKillApplication杀死Application,此时会触发一个KILL事件。
APP_REJECTED:多种情况下会触发APP_REJECTED事件,包括客户端通过RPC函数ApplicationClientProtocol#submitApplication向RM提交一个Application时,若抛出IOException异常,则会触发一个APP_REJECTED事件;若资源调度器认为应用程序非法(比如所在队列不存在或者已达到应用程序数目上限等),则拒绝接受该应用程序,同样最终会触发APP_REJECTED事件。
APP_ACCEPTED:当资源调度器认为应用程序合法时,将同意接受该应用程序,最终会触发APP_ACCEPTED事件。
APP_SAVED:用户提交应用程序后,ResourceManager首先要将应用程序信息保存到磁盘上,以便故障恢复时使用。一旦信息保存完成后,将触发一个APP_SAVED事件。
ATTEMPT_REGISTERED:应用程序的ApplicationMaster通过RPC函数ApplicationMasterProtocol#registerApplicationMaster向RM注册时,将触发一个ATTEMPT_REGISTERED事件
ATTEMPT_FINISHING:当应用程序的ApplicationMaster通过RPC函数ApplicationMasterProtocol#finishApplicationMaster 向RM汇报自己运行完成时,会触发一个ATTEMPT_FINISHING事件。
ATTEMPT_FINISHED:某个NodeManager通过心跳汇报ApplicationMaster所在的Container运行结束时,会触发一个ATTEMPT_FINISHED事件
NODE_UPDATE:NodeManager每次汇报心跳信息时会触发一个NODE_UPDATE事件,该事件会被广播给所有先关的应用程序
ATTEMPT_FAILED:应用程序的ApplicationMaster运行失败(可能是由于软件bug、硬件故障等原因导致ApplicationMaster自身运行失败,也可能是它内部任务失败数目过多致使ApplicationMaster主动退出)时,会触发一个ATTEMPT_FAILED事件。需要注意的是,状态机收到ATTEMPT_FAILED事件后不一定会立即转入FAILED状态,而是先检查失败次数是够超过用户设置的最大上限,如果没有,则再次创建一个RMAppAttemptImpl对象,并让状态机重新回到SUBMITTED状态,否则才最终会转入FAILED状态,这意味着应用程序彻底运行失败
ATTEMPT_KILLED:应用程序的ApplicationMaster被杀死时,会触发一个ATTEMPT_KILLED事件
注意: 应用程序状态机处于RUNNING状态时,可能直接转换为FINISHED状态,也可能先转换为FINISHING状态,然后再转换为FINISHED状态,这主要是由于ApplicationMaster汇报自己运行完成和ApplicationMaster所在Container退出运行这两个事件没有明确的时间先后之分,比如应用程序的ApplicationMaster可能没有调用ApplicationMasterProtocol#finishApplicationMaster通知ResourceManager自己将退出运行,而是直接退出运行;导致状态机进入FINISHED状态的唯一事件是ApplicationMaster所在Container正常退出
图5-7描述了各个事件的来源,这些事件主要来自ClientRMService和RMAppAttemptImpl两类组件。
RMAppAttempt状态机
RMAppAttempt是ResourceManager中用于维护一个Application 运行尝试(或者称为"Application Attempt")的生命周期的数据结构,它的实现是RMAppAttemptImpl,该类维护了一个状态机,记录了一个Application Attempt可能存在的各个状态以及导致状态间转换的事件。当某个事件发生时,RMAppAttemptImpl会根据实际情况进行Application Attempt状态转移,同时触发一个行为。除了维护状态机外,RMAppAttempt还保存了本次运行尝试的基本信息,包括当前使用的Container信息、ApplicationMaster Container对外tracking URL和RPC端口号等。由于在一次运行尝试中,最重要的组件是ApplicationMaster,它的当前状态可代表整个应用程序的当前状态,因此,RMAppAttemptImpl本质上是维护的ApplicationMaster生命周期。
RMAppAttemptImpl的作用是等待接收其他对象发出的RMAppAttemptEventType类型的事件,然后根据当前状态和事件类型,将当前状态转移到另外一种状态,同时触发另外一种行为(实际上执行一个函数,该函数可能会再次发出一种其他类型的事件),下面具体进行介绍。
1)基本状态
基本状态包括:
NEW:状态机初始状态,每个Application Attempt对应一个状态机,而每个状态机的初始状态为NEW。
SUBMITTED:RMAppImpl创建RMAppAttempt之后,将在第一时间向它发送一个START事件,为其创建各种Token后,将其状态置为SUBMITTED。
SCHEDULED:RMAppAttemptImpl被创建之后,ResourceManager会将它添加到ResouceScheduler中,通过ResouceScheduler合法性检查后,状态将被置为SCHEDULED,这表明ResouceScheduler开始为该Application的ApplicationMaster分配资源。
ALLOCATED_SAVING:RMAppAttemptImpl接收到ResouceScheduler分配的一个Container后(用于启动ApplicationMaster),会将该Container信息写入到磁盘上以便故障恢复使用。信息保存完成之前,RMAppAttemptImpl将处于ALLOCATED_SAVING状态。
ALLOCATED:RMAppAttemptImpl将收到的Container信息保存到文件中,以便于失败后从磁盘上恢复,信息保存完成之后,RMAppAttemptImpl状态转换为ALLOCATED
LAUNCHED:分配到Container后,ResourceManager中的ApplicationMasterLauncher与对应的NodeManager通信,以启动ApplicationMaster,此时RMAppAttemptImpl将被置为LAUNCHED状态。
RUNNING:ApplicationMaster在NodeManager上成功启动后,将通过RPC函数ApplicationMasterProtocol#registeApplicationMaster向ResourceManager注册,此时RMAppAttemptImpl状态被置为RUNNING。
FAILED:如果Application的ApplictionMaster运行失败(通过超时机制检测),RMAppAttemptImpl的状态将转换为FAILED
KILLED:如果客户端发出杀死应用程序命令,RMAppAttemptImpl将被置为KILLED
FINISHING:Application Master通过RPC函数ApplicationMasterProtocol#finishApplicationMaster通知RM,自己将运行结束,此时RMAppAttemptImpl被置为FINISHING状态
FINISHED:NodeManager通过心跳汇报ApplicationMaster所在的Container运行结束,此时RMAppAttemptImpl被置为FINISHED状态。
LAUNCHED_UNMANAGED_SAVING:为了方便对ApplictionMaster进行测试和满足特殊情况下对权限的要求,ResourceManager允许用户直接将ApplicationMaster启动在客户端而不是由ResourceManager启动,此时仍需对其记录日志以便故障恢复时使用,正在记录日志的RMAppAttemptImpl所处的状态是LAUNCHED_UNMANAGED_SAVING
*RECOVERED:如果管理员开启了应用程序恢复功能(默认不开启,可通过参数yarn.resourcemanager.recovery.enabled配置),则ResourceManager重启后,会从日志中恢复ApplicationAttemptImpl,恢复完成后所处的状态为RECOVERED。
2)基本事件
基本事件包括:
START:应用程序的状态机RMAppImpl创建运行尝试RMAppAttemptImpl后,会第一时间向它发送一个START事件,接收到该事件后,RMAppAttemptImpl会进行一些初始化工作,比如设置启动时间、获取各种安全Token等
KILL:当ClientRMService接收到来自客户端的杀死应用程序的命令后,它会向该应用程序的RMAppImpl对象发送一个RMAppEventType.KILL事件,而RMAppImpl对象则会进一步向它启动的RMAppAttemptImpl对象发送一个RMAppAttemptEventType.KILL事件,最终触发一个杀死ApplicationMaster所在RMContainer的命令,将应用程序杀死。
APP_ACCEPTED:资源调度器对RMAppAttemptImpl进行各种限制性检查(比如是否超过了应用程序运行数目上限)后,如果同意提交该应用程序,则会向RMAppAttemptImpl发送一个APP_ACCEPTED事件。一旦接收到该事件后,RMAppAttemptImpl将调用ApplicationMasterProtocol#allocate向资源管理器申请资源以启动ApplicationMaster。
CONTAINER_ALLOCATED:资源调度器将某个节点上的Container分配给该应用程序的RMAppAttemptImpl后,会创建一个RMContainerImpl对象,并向该对象发送一个RMContainerEventType.START事件。RMContainerImpl收到该事件后,会进一步向RMAppAttemptImpl发送一个RMAppAttemptEventType.CONTAINER_ALLOCATED事件。一旦接收到该事件后,RMAppAttemptImpl将调用ApplicationMasterProtocol#allocate获取资源管理器分配的资源,并向RMStateStore发送一个记录日志事件,以将资源分配信息写到磁盘上用于故障恢复。
ATTEMPT_SAVED:RMAppAttemptImpl收到分配的Container后,将<Application-AttemptId,Container,AppAttemptTokens>等信息存入RMStateStore(MemoryRMStateStore或者FileSystemRMStateStore)中,保存成功后,RMStateStore将向RMAppAttemptImpl发送一个ATTEMPT_SAVED事件
LAUNCHED:ApplicationMasterLauncher与NodeManager通信,成功启动应用程序的ApplicationMaster后,会向RMAppAttemptImpl发送一个LAUNCHED事件。收到该事件后,RMAppAttemptImpl会向AMLivelinessMonitor组件注册,以开启对ApplicationMaster的心跳监控。
REGISTERED:ApplicationMaster在NodeManager启动后,所要做的第一件事是调用RPC函数ApplicationMasterProtocol#registerApplicationMaster向ResourceManager注册,ResourceManager中的ApplicationMasterService处理收到该RPC请求后,会向RMAppAttemptImpl发送一个REGISTERED事件。
UNREGISTERED:ApplicationMaster运行完成后,将调用RPC函数Application-MasterProtocol#finishApplicationMaster通知ResourceManager自己运行结束,Resource-Manager中的ApplicationMasterService处理收到该RPC请求后,会向RMApp-AttemptImpl发送一个UNREGISTERED事件。需要注意的是,如果Application-Master不是由ResourceManager启动的(由用户程序启动,通常启动在客户端),则该事件会导致RMAppAttemptImpl直接进入FINISHED状态,否则将进入FINISHING状态,等待ApplicationMaster所在的Container退出后,再进一步转换为FINISHED状态。
CONTAINER_FINISHED:NodeManager周期性调用RPC函数ResourceTracker-#nodeHeartbeat向ResourceManager汇报心跳信息(包括各个Container状态、完成的Container列表、节点健康状况等),当ApplicationMaster所在Container退出运行后,NodeManager将它的运行状态汇报给ResourceManager,这会导致资源调度器触发一个RMContainerEventType.FINISHED事件,而RMContainerImpl收到该事件后,会进一步向RMAppAttemptImpl发送一个RMAppAttemptEventType.CONTAINER_FINISHED事件。
EXPIRE:如果一个RMAppAttemptImpl(ApplicationMaster)在一定时间内未汇报心跳信息,则AMLivelinessMonitor会向它发送一个EXPIRE事件,RMAppAttemptImpl收到该事件后,会进一步触发AMLauncherEventType.CLEANUP和RMAppEventType.ATTEMPT_FAILED两个事件,分别用于清理ApplicationMaster和正在使用的Container。
....
RMContainer状态机
在YARN中,根据应用程序需求,资源被切分成大小不同的资源块,每份资源基本信息由Container描述,而具体的使用状态追踪则是由RMContainer完成的。RMContainer是ResourceManager中用于维护一个Container生命周期的数据结构,它的实现是RMContainerImpl类,该类维护了一个Container状态机,记录了一个Container可能存在的各个状态以及导致状态间转换的事件,当某个事件发生时,RMContainerImpl会根据实际情况进行Container状态转移,同时触发一个行为。
在RM看来,每个Container有9种基本状态(RMContainerState)和8种导致这9种状态之间发生转移的事件(RMContainerEventType),RMContainerImpl的作用是等待接收其他对象发出的RMContainerEventType类型的事件,然后根据当前状态和事件类型,将当前状态转移到另外一种状态,同时触发另外一种行为(实际上执行一个函数,该函数可能会再次发出一种其他类型的事件)。下面具体进行介绍。
(1)基本状态
基本状态包括:
NEW:状态机初始状态,每个Container对应一个状态机,而每个状态机的初始状态均为NEW。
RESERVED:当一个节点上的资源不能够满足一个应用程的需求但又不得不分配给它时,YARN会让该节点为它预留资源,直到累计的空闲资源能够满足应用程序的需求后,才会将之封装成一个Container发送给应用程序的ApplicationMaster。当一个Container已被创建,但包含的资源尚不能满足应用程序需求时所处的状态为RESERVED。
ALLOCATED:当资源调度器将一个Container分配给一个Application时,该Container处于ALLOCATED状态。需要注意的是,此时Container处于可使用状态(只在ResourceManager内部进行了标记),但是ApplicationMaster还未获取该Container。
ACQUIRED:ApplicationMaster通过RPC函数ApplicationMasterProtocol#allocate获取分配给自己的Container列表,此时,这些Container状态将被置为ACQUIRED。
RUNNING:ApplicationMaster通过RPC函数ApplicationMasterProtocol#allocate拉取分配给自己的Container后,将与对应的NodeManager通信以启动这些Container,接着NodeManager通过心跳机制将这些Container状态汇报给ResourceManager,最终ResourceManager将这些Container状态置为RUNNING。
ApplicationMaster通过RPC函数ApplicationMasterProtocol#allocate向ResourceManager发送请求,要求它释放一些Container(可能是由于资源过剩或者内部的抢占机制要求释放一些Container),ResourceManager收到请求后将这些Container状态置为RELEASED,同时向RMNodeImpl发送RMNodeEventType.CLEANUP_CONTAINER事件以清理该Container。
COMPLETED:NodeManager通过RPC函数ResourceTracker#nodeHeartbeat告诉ResourceManager已经运行完成的Container列表,ResourceManager收到该信息后,会将这些Container状态置为COMPLETED。
EXPIRED:ResourceManager将一个Container分配给ApplicationMaster后,Application-Master必须在一定时间内使用该Container(默认是10min),否则ResourceManager会强制回收该Container,即将Container状态置为EXPIRED,同时向NodeManager发送杀死Container的命令。
KILLED:当出现以下几种情况时,将导致Container置为KILLED状态。
- 资源调度器为了保证公平性或者更高优先级的应用程序的服务质量,不得不杀死一些应用程序占用的Container以满足另外一些应用程序的要求。
- 某个NodeManager在一定时间内未向ResourceManager汇报心跳信息,则Resource-Manager认为它死掉了,会将它上面所有正运行的Container状态置为KILLED。
- 用户(使用API或者Shell命令)强制杀死一个RMAppAttemptImpl实例时,会导致它所有的Container状态置为KILLED。
RMNode状态机
RMNode是ResourceManager中用于维护一个节点生命周期的数据结构,它的实现是RMNodeImpl类,该类维护了一个节点状态机,记录了节点可能存在的各个状态以及导致状态间转换的事件。当某个事件发生时,RMNodeImpl会根据实际情况进行节点状态转移,同时触发一个行为。
如图5-12所示,在RM看来,每个节点有6种基本状态(NodeState)和8种导致这6种状态之间发生转移的事件(RMNodeEventType),RMNodeImpl的作用是等待接收其他对象发出的RMNodeEventType类型的事件,然后根据当前状态和事件类型,将当前状态转移到另外一种状态,同时触发另外一种行为(实际上执行一个函数,该函数可能会再次发出一种其他类型的事件)。
- NEW:状态机初始状态,每个NodeManager对应一个状态机,而每个状态机的初始状态均为NEW。
- RUNNING:NodeManager启动后,会通过RPC函数ResourceTracker#registerNodeManager向ResourceManager注册,此时NodeManager会进入RUNNING状态
- DECOMMSIONED:如果一个节点被加入到exlude list(黑名单)中,则对应的NodeManager将被置为DECOMMSIONED状态,这样,该NodeManager将无法与ResourceManager通信(直接在RPC层抛出异常导致NodeManager异常退出)。
- UNHEALTHY:管理员可在每个NodeManager上配置一个健康状况监测脚本,NodeManager中有一个专门线程周期性执行该脚本,以判定NodeManager是否处于健康状态。NodeManager会通过心跳机制将脚本执行结果汇报给Resource-Manager,如果ResourceManager发现它处于不健康状态下,则会将其状态置为UNHEALTHY,此后ResouceManager不会再为该节点分配新的任务,直到它重新变为健康状态
- LOST:ResourceManager中的组件NMLivelinessMonitor会跟踪每一个NodeManager的心跳信息,如果一个NodeManager在一定时间间隔内未汇报心跳信息,则认为它死掉了,RMNodeImpl会将其置为LOST状态,之后它上面所有正运行的Container信息将被置为FAILED。
- REBOOTED:如果ResourceManager发现NodeManager汇报的心跳ID与自己保存的不一致,则会将其置为REBOOTED状态,从而要求它重新启动以达到同步的目的。
几个常见行为分析
启动ApplicationMaster
下面将介绍从应用程序提交到启动ApplicationMaster的整个过程,期间涉及Client-RMService、RMAppManager、RMAppImpl、RMAppAttemptImpl、RMNode、ResouceScheduler等几个主要组件
当客户端调用RPC函数ApplicationClientProtocol#submitApplication后,ResourceManager端的处理过程(假设整个过程未出现任何异常)如图5-14所示。
具体步骤如下:
步骤1 ResourceManager中的ClientRMService实现了ApplicationClientProtocol协议,它处理来自客户端的请求,并调用RMAppManager#submitApplication通知其他相关服务作进一步处理。
步骤2 RMAppManager为该应用程序创建一个RMAppImpl对象以维护它的运行状态,并判断系统状态,如果是故障重启状态,则向它发送一个RMAppEventType.RECOVER事件,否则发送一个RMAppEventType.START事件
步骤3 RMAppImpl收到RMAppEventType.START事件后(暂不介绍RMAppEvent-Type.RECOVER事件的处理过程),会调用RMStateStore#storeApplication(RMStateStore是插拔式组件,在不启用RM恢复机制的前提下,默认实现是NullRMStateStore,它不会进行任何保存工作,其他实现还有MemoryRMStateStore、FileSystemRMStateStore等),以日志记录RMAppImpl当前信息,至此,RMAppImpl的运行状态由NEW转移为NEW_SAVING
步骤4 日志记录完成后,RMStateStore进一步向RMAppImpl发送RMAppEventType.APP_SAVED事件。
步骤5 RMAppImpl收到RMAppEventType.APP_SAVED事件后,将创建一个运行实例对象RMAppAttemptImpl,同时向它发送一个RMAppAttemptEventType.START事件,至此,RMAppImpl的运行状态由NEW_SAVING转移为SUBMITTED。
步骤6 RMAppAttemptImpl收到RMAppAttemptEventType.START事件后,进行一些必要的初始化工作(设置初始事件,各种安全Token等),然后向ResourceScheduler发送SchedulerEventype.APP_ADDED事件,至此,RMAppAttemptImpl状态由NEW转移为SUBMITTED。
步骤7 ResourceScheduler收到SchedulerEventype.APP_ADDED事件后,首先进行一些权限检查(如果通不过这些检查,则拒绝接受应用程序提交),然后将应用程序信息保存到内部的数据结构中,并向RMAppAttemptImpl发送RMAppAttemptEventType.APP_ACCEPTED事件。
步骤8 RMAppAttemptImpl收到RMAppAttemptEventType.APP_ACCEPTED事件后,首先向RMAppImpl发送一个RMAppEventType.APP_ACCEPTED事件(RMAppImpl收到该事件后直接将状态从SUBMITTED转移为ACCEPTED),然后调用ResourceScheduler#allocate为应用程序的ApplicationMaster申请资源,该资源描述如下:
<AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.getSubmissionContext().getResource(), 1 >
即一个优先级为AM_CONTAINER_PRIORITY(值为0)、可在任意节点(ResourceRequest.ANY)上、资源量为X(用户提交应用程序时指定)的Container。
至此,RMAppAttemptImpl状态由SUBMITTED转移为SCHEDULED。
步骤9 ResourceManager为应用程序的ApplicationMaster分配资源后,创建一个RMContainerImpl,并向它发送一个RMContainerEventType.START事件。
步骤10 RMContainerImpl收到RMContainerEventType.START事件后,直接向RM-AppAttemptImpl发送一个RMAppAttemptEventType.CONTAINER_ALLOCATED事件,至此,RMContainerImpl状态从NEW转移为ALLOCATED。
步骤11 RMAppAttemptImpl收到RMAppAttemptEventType.CONTAINER_ALLOCATED事件后,调用ResourceScheduler#allocate获取分配的资源,ResourceScheduler将资源返回给它之前,会向RMContainerImpl发送一个RMContainerEventType.ACQUIRED事件(它收到该事件后没有后续处理工作),而RMAppAttemptImpl收到资源后,第一时间向RMStateStore发送MStateStoreEventType.STORE_APP_ATTEMPT事件请求记录日志,至此,RMAppAttemptImpl状态从SCHEDULED转移为ALLOCATED_SAVING。
步骤12 日志记录完成后,RMStateStore进一步向RMAppAttemptImpl发送RMApp-AttemptEventType.ATTEMPT_SAVED事件。
步骤13 RMAppAttemptImpl收到RMAppAttemptEventType.ATTEMPT_SAVED事件后,将向ApplicationMasterLauncher发送AMLauncherEventType.LAUNCH事件,至此,RMAppAttemptImpl状态从ALLOCATED_SAVING转移为ALLOCATED。
步骤14 ApplicationMasterLauncher收到AMLauncherEventType.LAUNCH事件后,会将该事件放到事件队列中,等待AMLauncher线程池中的线程处理该事件。处理方法是,与对应的NodeManager通信,启动ApplicationMaster,一旦成功启动后,将进一步向RMAppAttemptImpl发送RMAppAttemptEventType.LAUNCHED事件。RMAppAttemptImpl收到RMAppAttemptEventType.LAUNCHED事件后,会向AMLivelinessMonitor注册,以监控运行状态。至此,RMAppAttemptImpl状态从ALLOCATED转移为LAUNCHED。
步骤15 NodeManager通过心跳机制汇报ApplicationMaster所在Container已经成功启动,收到该信息后,ResourceScheduler将发送一个RMContainerEventType.LAUNCHED事件,RMContainerImpl收到该事件后,会从ContainerAllocationExpirer监控列表中移除
步骤16 启动的ApplicationMaster通过RPC函数ApplicationMasterProtocol#register-ApplicationMaster向ResourceManager注册,ResourceManager中的ApplicationMaster-Service服务接收到该请求后,将向RMAppAttemptImpl发送一个RMAppAttemptEventType.REGISTERED事件,而RMAppAttemptImpl收到该事件后,首先保存该ApplicationMaster的基本信息(比如所在host、启用的RPC端口号等),比如所在host、启用的RPC端口号等),然后向RMAppImpl发送一个RMApp-EventType.ATTEMPT_REGISTERED事件。至此,RMAppAttemptImpl状态从LAUNCHED转移为RUNNING。
步骤17 RMAppImpl收到RMAppEventType.ATTEMPT_REGISTERED事件后,所做的事情仅是将状态从ACCEPTED转换为RUNNING
申请与分配Container
下面介绍应用程序的ApplicationMaster在NodeManager上成功启动并向Resource-Manager注册后,向ResourceManager请求资源(Container)到获取到资源的整个过程中,及ResourceManager内部涉及的主要工作流程。整个过程可看做以下两个阶段的迭代循环:
阶段1 ApplicationMaster汇报资源需求并领取已经分配到的资源;
阶段2 NodeManager向ResourceManager汇报各个Container运行状态,如果Resource-Manager发现它上面有空闲的资源,则进行一次资源分配,并将分配的资源保存到对应的应用程序数据结构中,等待下次ApplicationMaster发送心跳信息时获取(即阶段1)
这两个阶段流程如图5-15所示。
Container分配与申请流程的具体步骤如下
(1)阶段1
步骤1 ApplicationMaster通过RPC函数ApplicationMasterProtocol#allocate向Resource-Manager汇报资源需求(由于该函数被周期性调用,我们通常也称之为“心跳”),包括新的资源需求描述、待释放的Container列表、请求加入黑名单的节点列表、请求移除黑名单的节点列表等。
步骤2 ResourceManager中的ApplicationMasterService负责处理来自ApplicationMaster的请求,一旦收到该请求,会向RMAppAttemptImpl发送一个RMAppAttemptEventType.STATUS_UPDATE类型事件,而RMAppAttemptImpl收到该事件后,将更新应用程序执行进度和AMLivenessMonitor中记录的应用程序最近更新时间。
步骤3 ApplicationMasterService调用ResourceScheduler#allocate函数,将该Application-Master资源需求汇报给ResourceScheduler。
步骤4 ResourceScheduler首先读取待释放Container列表,依次向对应的RMContainerImpl发送RMContainerEventType.RELEASED类型事件,以杀死正在运行的Conainer,然后将新的资源需求更新到对应的数据结构中,并返回已经为该应用程序分配的资源。
2)阶段2
步骤1 NodeManager通过RPC函数ResourceTracker#nodeHeartbeat向Resource-Manager汇报各个Container运行状态。
步骤2 ResourceManager中的ResourceTrackerService负责处理来自NodeManager的请求,一旦收到该请求,会向RMNodeImpl发送一个RMNodeEventType.STATUS_UPDATE类型事件,而RMNodeImpl收到该事件后,将更新各个Container的运行状态,并进一步向ResourceScheduler发送一个SchedulerEventType.NODE_UPDATE类型事件
步骤3 ResourceScheduler收到事件后,如果该节点上有可分配的空闲资源,则会将这些资源分配给各个应用程序,而分配后的资源仅是记录到对应的数据结构中,等待ApplicationMaster下次通过心跳机制来领取。
杀死Application
“杀死Application”行为通常是由客户端发起的,比如用户使用命令"bin/yarn application -kill XXX"杀死一个已经提交的应用程序。ResourceScheduler中的服务ClientRM-Service负责处理该请求,通过权限检查后(确保该用户有权限杀死该应用程序),它会向该应用程序状态对应的(RMAppImpl类型)状态维护对象发送一个RMAppEventType.KILL类型的事件,RMAppImpl收到该事件后,根据当前运行状态调用相应的行为函数,这些函数的主要工作是清理该应用程序已经运行完成(但运行失败)或者正在运行的实例(RMAppAttemptImpl),总体上讲,这一行为主要分为两种情况:
情况1 不存在正在运行的RMAppAttemptImpl。
当应用程序不存在处于运行状态的运行实例RMAppAttemptImpl时,整个过程比较简单,如图5-16所示,RMAppImpl向前几个运行实例(如果有的话,这些实例通常已经运行失败)所在的节点状态维护对象RMNodeImpl发送RMNodeEventType.CLEANUP_APP事件,以记录节点曾运行过的应用程序;同时,也会向RMAppManager发送RMApp-ManagerEventType.APP_COMPLETED事件,以标注该应用程序运行状态并移除保存在RMStateStore中的日志信息
情况2 存在正在运行的RMAppAttemptImpl。
如图5-17所示,当应用程序存在正在运行的RMAppAttemptImpl时,除了完成情况1描述的步骤外,RMAppImpl还要向RMAppAttemptImpl发送RMAppAttemptEventType.KILL,以回收RMAppAttemptImpl已经申请和占用的资源。而RMAppAttemptImpl收到该事件后,将在第一时间向RMAppImpl发送RMAppEventType.ATTEMPT_KILLED作为(对RMAppAttemptEventType.KILL事件的)应答,而真正的资源回收操作则由资源调度器ResourceScheduler异步完成的。
- 回收ApplicationMaster占用资源:向ApplicationMasterLauncher发送AMLauncherEventType.CLEANUP类型的事件,以杀死ApplicationMaster并回收它占用的资源(如果ApplicationMaster尚未启动则跳过)
回收Container资源:向各个已经启动的RMContainerImpl(如果没有则跳过)发送RMContainerEventType.KILL类型的事件,以杀死已经启动的Container并回收它们占用的资源。
Container超时
在YARN中,存在两种类型的Container,分别是用于运行ApplicationMaster的Container(后面简称为"AM Container")和运行普通任务的Container(后面简称为“普通Container”),第一种Container的超时将导致整个Application运行失败,而第二种Container超时则会触发一次资源回收。需要注意的是,第二种Container超时导致任务运行失败后,YARN不会主动将其调度到另外一个节点上运行,而是将状态告诉应用程序的ApplicationMaster,由它决定是否重新申请资源或者重新执行。
(1)AM Container超时
AM Container是由RMAppAttemptImpl根据用户设置的资源需求向ResourceScheduler申请的,一旦申请到满足要求的Container后,它会将之(RMContainerImpl)放到ContainerAllocationExpirer组件中,以确保对应的NodeManager能够在一定时间内启动它。如果NodeManager在一定时间内没有启动ApplicationMaster则会触发一系列资源回收流程,如图5-18所示,具体如下
步骤1 该NodeManager没能在一定时间内(默认是10min)启动应用程序的ApplicationMaster,导致ContainerAllocationExpirer触发一个SchedulerEventType.CONTAINER_EXPIRED类型的事件,而ResourceScheduler收到该事件后,将进一步向RMContainerImpl发送一个RMContainerEventType.EXPIRE类型事件。
步骤2 RMContainerImpl收到该事件后(正常情况下,它处于RMContainerState.ACQUIRED状态中),首先从ContainerAllocationExpirer中移除监控,然后向AM Container所在节点的RMNodeImpl发送RMNodeEventType.CLEANUP_CONTAINER事件,向RMApp-AttemptImpl发送RMAppAttemptEventType.CONTAINER_FINISHED事件
步骤3 RMNodeImpl收到RMNodeEventType.CLEANUP_CONTAINER事件后,将之放入待清理Container列表中,等到对应的NodeManager汇报心跳时,将该Container返回给它以对其进行清理。
步骤4 RMAppAttemptImpl收到RMAppAttemptEventType.CONTAINER_FINISHED事件后(正常情况下,它处于RMAppAttemptState.LAUNCHED状态中),它将进一步向RMAppImpl发送RMAppEventType.ATTEMPT_FAILED事件,向ResourceScheduler发送SchedulerEventType.APP_REMOVED事件。
步骤5 RMAppImpl收到事件后,如果未超过用户设置的运行次数上限,将尝试启动一个新的RMAppAttemptImpl或者(否则)直接宣布该应用程序运行失败;ResourceScheduler收到事件后,会清理该应用程序相关信息。
(2)普通Container超时
相对于AM Container超时而言,普通Container超时产生的资源回收流程则简单一些。普通Container超时发生在普通Container被分配给一个ApplicationMaster后,没能够在一定时间内运行起来(Container运行成功后,NodeManager会通过心跳将它的状态汇报给ResourceManager),也就是说,ApplicationMaster没能够在一定时间内使用Container
如图5-19所示,普通Container超时触发的资源回收流程跟AM Container的回收流程的前三个步骤是一样的,不同的是后几个流程:RMAppAttemptImpl收到RMApp-AttemptEventType.CONTAINER_FINISHED事件后(正常情况下,它处于RMAppAttempt-State.RUNNING状态中),将该Container保存到已完成列表(该列表中的Container可能处于COMPLETED、KILLED、RELEASED或EXPIRED四种状态之一)中,等到ApplicationMaster下次汇报心跳时,将该列表返回给它,至于如何处理这些失败的Container中的任务,是重新申请资源运行该任务还是舍弃该任务,完全由ApplicationMaster内部的策略决定。
ApplicationMaster超时
ApplicationMaster向ResourceManager注册后,必须周期性通过RPC函数ApplicationMasterProtocol#allocate向ResourceManager汇报心跳以表明自己还活着。如果一段时间(默认是10min)内它未汇报心跳,则ResourceManager宣布它死亡,进而导致(如果重试次数未超过用户设置的运行上限值)应用程序重新运行或者(超过运行次数上限)直接退出。ApplicationMaster超时是由监控组件AMLivenessMonitor发现并触发的(RMAppAttemptEventType.EXPIRE事件)
NodeManager超时
NodeManager启动后将通过RPC函数ResourceTracker#registerNodeManager向Resource-Manager注册,之后它将被加入到NMLivenessMonitor中进行监控。它必须周期性地通过RPC函数ResourceTracker#nodeHeartbeat向ResourceManager汇报心跳以表明自己还活着,如果一段时间(默认是10min)内它未汇报心跳,则ResourceManager宣布它死亡,所以正运行在它上面的Container将被回收,如图5-20所示。
上述过程具体如下:
步骤1 NMLivenessMonitor发现NodeManager在一段时间内未汇报心跳,则认为它死掉了,会触发一个RMNodeEventType.EXPIRE类型的事件。
步骤2 RMNodeImpl收到该事件后,分别向ResourceScheduler和NodesListManager发送一个SchedulerEventType.NODE_REMOVED事件和NodesListManagerEventType.NODE_UNUSABLE事件,同时将节点状态由RUNNING转为LOST。
步骤3 ResourceScheduler收到NODE_REMOVED事件后,会向运行在死亡节点上的RMContainerImpl发送RMContainerEventType.KILL事件,以清理占用的内存空间;NodesListManager收到NODE_UNUSABLE事件后,会向所有当前正在运行的RMAppImpl发送RMAppNodeUpdateType.NODE_UNUSABLE事件
容错
在Hadoop 1.0中,HDFS和MapReduce(MRv1)均采用了Master/Slave结构,这种结构虽然具有设计非常简单的优点,但同时也存在Master单点故障问题。由于存在单点故障问题的系统不适合在线应用场景,这使得Hadoop在相当长时间内仅用于离线存储和计算。在Hadoop 2.0中,HDFS同样面临着单点故障问题,但由于每个MapReduce作业拥有自己的作业管理组件(ApplicationMaster),因此不再存在单点问题,但新引入的资源管理系统YARN也采用了Master/Slave结构,同样出现了单点故障问题。
作为一个分布式系统,YARN必须具备的一个特点是高容错性,因此YARN需要考虑ApplicationMaster、NodeManager、Container和ResourceManager等服务或组件的容错性。这些服务或组件的容错机制如下。
ApplicationMaster容错:前面提到,不同类型的应用程序拥有不同的ApplicationMaster,而ResourceManager负责监控ApplicationMaster的运行状态,一旦发现它运行失败或者超时,会为其重新分配资源并启动它。至于启动之后ApplicationMaster内部的状态如何恢复需要由自己保证,比如MRAppMaster(MapReduce ApplicationMaster)在作业运行过程中将状态信息动态记录到HDFS上,一旦出现故障重启后,它能够从HDFS读取并恢复之前的运行状态,以减少重新计算带来的开销。
NodeManager容错:如果NodeManager在一定时间内未向ResourceManager汇报心跳信息(可能是网络原因或者自身原因),则ResourceManager认为它已经死掉了,会将它上面所有正在运行的Container状态置为失败,并告诉对应的ApplicationMaster(如果AM Container运行失败,则需重新分配资源启动ApplicationMaster),以决定如何处理这些Container中运行的任务。
Container容错:如果ApplicationMaster在一定时间内未启动分配到的Container,则ResourceManager会将该Container状态置为失败并回收它;如果一个Container在运行过程中,因为外界原因(比如资源不足、误杀等)导致运行失败,则ResourceManager会转告给对应的ApplicationMaster,由它决定如何处理。
ResourceManager容错:ResourceManager负责整个集群的资源管理和调度,它的重要性不言而喻,因此它自身的容错性直接决定了YARN的可用性和可靠性。下面也将重点介绍ResourceManager的容错机制。
Hadoop HA基本框架
在Master/Slave架构中,为了解决Master的单点故障问题(也称为高可用问题,即HA,High Availability),通常采用热备方案,即集群中存在一个对外服务的Active Master和若干个处于就绪状态的Standby Master,一旦Active Master出现故障,立即采用一定的策略选取某个Standby Master转换为Active Master以正常对外提供服务。同样,Hadoop 2.0也正是采用这种方案解决各系统的单点故障问题。
总体上说,Hadoop 2.0中的HDFS和YARN均采用了基于共享存储的HA解决方案,即Active Master不断将信息写入一个共享存储系统,而Standby Master则不断读取这些信息,以与Active Master的内存信息保持同步。当需要主备切换时,选中的Standby Master需先保证信息完全同步后,再将自己的角色切换至Active Master。目前而言,常用的共享存储系统有以下几个
Zookeeper:Zookeeper是一个针对大型分布式系统的可靠协调系统,提供的功能包括统一命名服务、配置管理、集群管理、共享锁和队列管理等。需要注意的是,Zookeeper设计目的并不是数据存储,但它的确可以安全可靠地存储少量数据以解决分布式环境下多个服务之间的数据共享问题
NFS(Network File System):NFS是一种非常经典的数据共享方式,它可以透过网络,让不同的机器和不同的操作系统之间彼此共享文件
HDFS:Hadoop自带的分布式文件系统,由于它本身存在单点故障问题,因此Hadoop的单点问题不能够通过它解决。
在Hadoop 2.0中,YARN HA采用了基于Zookeeper的方案。