前言
YARN是一个资源管理系统,负责集群资源的管理和分配。yarn就好比hadoop集群的操作系统,当用户向YARN中提交一个应用程序后,需要同操作系统做一些交互,这样才能运行在yarn上。yarn分两个阶段运行应用程序: 第一个阶段是启动ApplicationMaster; 第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如果想要将一个新的应用程序跑在yarn上,通常需要编写这两个组件:客户端和ApplicationMaster。这两个组件的编写非常复杂,尤其是ApplicationMaster,需要考虑rpc调用,容错等细节,往往由专业的开发人员编写这两个组件,然后提供给上层的应用程序用户使用。如果大量 应用程序可以抽象为一种通用的框架, 那么只需要实现一个客户端和一个ApplicationMaster,这样所有的应用程序直接用这两个组件即可,比如MapReduce是一种通用的计算框架,YARN已经内嵌了一个可以直接使用的客户端MRClientService和ApplicationMaster—MRAppMaster。对于无法用MapReduce计算框架抽象出来的应用程序,要想跑在yarn上,就必须用户自己编写这两个组件,客户端和ApplicationMaster,其中,客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序运行状态,ApplicationMaster负责向ResourceManager申请资源(以Container形式表示),并与NodeManager通信以启动各个Container,此外,ApplicationMaster还负责监控各个任务运行状态,并在失败是为其重新申请资源。
开发Client启动AM
Client部分是用于将应用提交到YARN, 从而可以启动application master.客户端通常只需与ResourceManager交互,期间涉及到多个数据结构和一个RPC协议,具体如下:
客户端通过rpc协议ApplicationClientProtocal向ResourceManager(具体是ApplicationsManager、ASM)发送应用程序提交请求GetNewApplicationRequest,ResourceManager为其返回应答GetNewApplicationResponse,该数据结构中包含多种信息,包括ApplicationId、可资源使用上限和下限等。初始化并启动一个yarnClient:
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
- Client部分最关键的是构建一个ApplicationSubmissionContext。启动ApplicationMaster所需的所有信息打包到数据结构ApplicationSubmissionContext中,主要包括以下几种信息:
(1) application id
(2) application 名称
(3) application优先级
(4) application 所属队列
(5) application 启动用户名
(6) ApplicationMaster对应的Container信息ContainerLaunchContext,包括:启动ApplicationMaster所需各种文件资源、jar包、环境变量、启动命令、运行ApplicationMaster所需的资源(主要指内存)等。
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
客户端调用ClientRMProtocol#submitApplication(ApplicationSubmissionContext)将ApplicationMaster提交到ResourceManager上。ResourceManager收到请求后,会为ApplicationMaster寻找合适的节点,并在该节点上启动它。
LOG.info("Submitting application to ASM");
yarnClient.submitApplication(appContext);
客户端可通过多种方式查询应用程序的运行状态,其中一种是调用RPC函数ClientRMProtocol#getApplicationReport获取一个应用程序当前运行状况报告,该报告内容包括应用程序名称、所属用户、所在队列、ApplicationMaster所在节点、一些诊断信息、启动时间等。
// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for"
+ ", appId=" + appId.getId()
+ ", clientToAMToken=" + report.getClientToAMToken()
+ ", appDiagnostics=" + report.getDiagnostics()
+ ", appMasterHost=" + report.getHost()
+ ", appQueue=" + report.getQueue()
+ ", appMasterRpcPort=" + report.getRpcPort()
+ ", appStartTime=" + report.getStartTime()
+ ", yarnAppState=" + report.getYarnApplicationState().toString()
+ ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ ", appTrackingUrl=" + report.getTrackingUrl()
+ ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
- 如果有异常或者其他情况,可以通过yarnClient.killApplication(appId);来kill掉应用;
开发ApplicationMaster
ApplicationMaster需要与ResoureManager和NodeManager交互,以申请资源和启动Container,期间涉及到多个数据结构和两个RPC协议。具体步骤如下:
-
ApplicationMaster首先需通过RPC协议AMRMProtocol向ResourceManager发送注册请求RegisterApplicationMasterRequest,该数据结构中包含ApplicationMaster所在节点的host、RPC port和TrackingUrl等信息,而ResourceManager将返回RegisterApplicationMasterResponse,该数据结构中包含多种信息,包括该应用程序的ACL列表、可资源使用上限和下限等。
ApplicationMaster与RM之间的心跳:整个运行过程中,ApplicationMaster需通过心跳与ResourceManager保持联系,这是因为,如果一段时间内(默认是10min),ResourceManager未收到ApplicationMaster信息,则认为它死掉了,会重新调度或者让其失败。通常而言,ApplicationMaster周期性调用RPC函数ApplicationMasterProtocol.allocate向其发送空的AllocateRequest请求即可。
构造Container:根据每个任务的资源需求,ApplicationMaster可向ResourceManager申请一系列用于运行任务的Container,ApplicationMaster使用ResourceRequest类描述每个Container(一个container只能运行一个任务):
1)Hostname:期望Container所在的节点,如果是*,表示可以为任意节点。
2)Resource capability:运行该任务所需的资源量,如(memory/disk/cpu)。
3)Priority:任务优先级。一个应用程序中的任务可能有多种优先级,ResourceManager会优先为高优先级的任务分配资源。
4)numContainers:符合以上条件的container数目。
- 申请资源分配Container:一旦为任务构造了Container后,ApplicationMaster会使用RPC函数AMRMProtocol#allocate向ResourceManager发送一个AllocateRequest对象,以请求分配这些Container,AllocateRequest中包含以下信息:
1)Requested containers:所需的Container列表
2)Released containers:有些情况下,比如有些任务在某些节点上失败过,则ApplicationMaster不想再在这些节点上运行任务,此时可要求释放这些节点上的Container。
3)Progress update information:应用程序执行进度
4)ResponseId:RPC响应ID,每次调用RPC,该值会加1。
ResourceManager会为ApplicationMaster返回一个AllocateResponse对象,该对象中主要信息包含在AMResponse中:
1)reboot:ApplicationMaster是否需要重新初始化.当ResourceManager端出现不一致状态时,会要求对应的ApplicationMaster重新初始化。
2)Allocated Containers:新分配的container列表。
3)Completed Containers:已运行完成的container列表,该列表中包含运行成功和未成功的Container,ApplicationMaster可能需要重新运行那些未运行成功的Container。
-
ApplicationMaster会不断追踪已经获取的container,且只有当需求发生变化时,才允许重新为Container申请资源。
启动Container, 当ApplicationMaster(从ResourceManager端)收到新分配的Container列表后,会使用ContainerManagementProtocol#startContainer向对应的NodeManager发送ContainerLaunchContext以启动Container,ContainerLaunchContext包含以下内容:
1)ContainerId:Container id
2)Resource:该Container可使用的资源量(当前仅支持内存)
3)User:Container所属用户
4)Security tokens:安全令牌,只有持有该令牌才可启动container
5)LocalResource:运行Container所需的本地资源,比如jar包、二进制文件、其他外部文件等。
6)ServiceData:应用程序可能使用其他外部服务,这些服务相关的数据通过该参数指定。
7)Environment:启动container所需的环境变量
8)command:启动container的命令
- 监控Container:ApplicationMaster可以通过2种途径监控启动的Container:
使用ApplicationMasterProtocol.allocate向ResourceManager发送查询请求;
使用ContainerManagementProtocol查询指定的ContainerId对应的Container的状态;
- ApplicationMaster会不断重复前面的步骤,直到所有任务运行成功,此时,它会发送FinishApplicationMasterRequest,以告诉ResourceManage自己运行结束。
distributedshell实例分析
Distributedshell Client的入口main函数如下:
public static void main(String[] args) {
...
Client client = new Client();
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
result = client.run();
...
}
DistributedShell Client中最重要的是函数为run(),该函数实现过程如下:
利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager
private void connectToASM() throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
LOG.info("Connecting to ResourceManager at " + rmAddress);
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, conf));
}
- 获取application id。
与ResourceManager通信,请求application id:
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
- 构造ContainerLaunchContext。
构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//添加本地资源
//填充localResources
amContainer.setLocalResources(localResources);
//添加运行ApplicationMaster所需的环境变量
Map<String, String> env = new HashMap<String, String>();
//填充env
amContainer.setEnvironment(env);
//添加启动ApplicationMaster的命令
//填充commands;
amContainer.setCommands(commands);
//设置ApplicationMaster所需的资源
amContainer.setResource(capability);
- 构造ApplicationSubmissionContext。
构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:
ApplicationSubmissionContext appContext =
Records.newRecord(ApplicationSubmissionContext.class);
//设置application id,调用GetNewApplicationResponse#getApplicationId()
appContext.setApplicationId(appId);
//设置Application名称:“DistributedShell”
appContext.setApplicationName(appName);
//设置前面创建的container
appContext.setAMContainerSpec(amContainer);
//设置application的优先级,默认是0
pri.setPriority(amPriority);
//设置application的所在队列,默认是""
appContext.setQueue(amQueue);
//设置application的所属用户,默认是""
appContext.setUser(amUser);
- 提交ApplicationMaster。
将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:
applicationsManager.submitApplication(appRequest);
- 显示应用程序运行状态。
为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:
while (true) {
Thread.sleep(1000);
GetApplicationReportRequest reportRequest =
Records.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse =
applicationsManager.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
//打印report内容
...
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
return true;
} else {
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
return false;
}
}
Distributedshell ApplicationMaster源码分析
- ApplicationMaster由ResourceManager分配的一个container启用,之后,它与ResourceManager通信,注册自己,以告知自己所在的节点(host:port),trackingurl(客户端可通过该url直接查询AM运行状态)等。
RegisterApplicationMasterRequest appMasterRequest =
Records.newRecord(RegisterApplicationMasterRequest.class);
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostname); appMasterRequest.setRpcPort(appMasterRpcPort);
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
return resourceManager.registerApplicationMaster(appMasterRequest);
ApplicationMaster周期性向ResourceManager发送心跳信息,以告知ResourceManager自己仍然活着,这是通过周期性调用AMRMProtocol#allocate实现的。
为了完成计算任务,ApplicationMaster需向ResourceManage发送一个ResourceRequest描述对资源的需求,包括container个数、期望资源所在的节点、需要的CPU和内存等,而ResourceManager则为ApplicationMaster返回一个AllocateResponse结构以告知新分配到的container列表、运行完成的container列表和当前可用的资源量等信息。
while (numCompletedContainers.get() < numTotalContainers
&& !appDone) {
Thread.sleep(1000);
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
if (askCount > 0) {
ResourceRequest containerAsk = setupContainerAskForRM(askCount);
resourceReq.add(containerAsk);
}
//如果resourceReq为null,则可看做心跳信息,否则就是申请资源
AMResponse amResp =sendContainerAskToRM(resourceReq);
}
- 对于每个新分配到的container,ApplicationMaster将创建一个ContainerLaunchContext对象,该对象包含container id,启动container所需环境、启动container命令,然后与对应的节点通信,以启动container。
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer);
//每个container由一个线程启动
Thread launchThread = new Thread(runnableLaunchContainer);
launchThreads.add(launchThread);
launchThread.start();
- ApplicationMaster通过AMRMProtocol#allocate获取各个container的运行状况,一旦发现某个container失败了,则会重新向ResourceManager发送资源请求,以重新运行失败的container。
- 作业运行失败后,ApplicationMaster向ResourceManager发送FinishApplicationMasterRequest请求,以告知自己运行结束。
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
boolean isSuccess = true;
if (numFailedContainers.get() == 0) {
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
}