应用程序(Application)是用户编写的处理数据的程序的统称,它从YARN中申请资源以完成自己的计算任务.YARN自身对应用程序类型没有任何限制,它可以是处理短类型任务的MapReduce作业,也可以是部署长时间运行的服务的应用程序。应用程序可以向YARN申请资源完成各类计算任务。比如MapReduce应用程序向YARN申请资源,用于运行Map Task和Reduce Task两类任务。
由于YARN应用程序编写比较复杂,且需对YARN本身的架构有一定了解,因此通常由专业人员开发,通过回调的形式供其他普通用户使用。比如,专业人员实现可以直接运行在YARN之上的MapReduce框架库(假设打包后为yarn-mapreduce.jar,主要完成数据切分、资源申请、任务调度与容错、网络通信等功能),而普通用户只需编写map()和reduce()两个函数完成MapReduce程序设计(假设打包后为my-app.jar,主要完成自己计算所需的逻辑)。这样用户提交应用程序时,YARN会自动将yarn-mapreduce.jar和my-app.jar两个JAR包同时提交到YARN之上,以完成一个分布式应用的计算。下面将重点介绍的就是专业人员应如何编写一个可直接运行在YARN之上的框架(即前面例子中的yarn-mapreduce.jar)
YARN是一个资源管理系统,负责集群资源的管理和调度。如果想要将一个新的应用程序运行在YARN之上,通常需要编写两个组件Client(客户端)和ApplicationMaster。这两个组件编写非常复杂,尤其ApplicationMaster,需要考虑RPC调用、任务容错等细节。如果大量应用程序可抽象成一种通用框架,只需实现一个客户端和一个ApplicationMaster,然后让所有应用程序重用这两个组件即可。比如MapReduce是一种通用的计算框架,YARN已经为其实现了一个直接可以使用的客户端(JobClient)和ApplicationMaster(MRAppMaster)。
我们知道运行在YARN上的应用程序主要分为短应用程序和长应用程序两类,其中,短应用程序是短时间内可运行完成的程序,比如MapReduce作业、Tez作业等。长应用程序是永不终止运行的服务,比如Storm Service、HBase Service等。尽管这两类应用程序作用不同,但它们在YARN上的工作流程和编写方式是相同的。
用户需要编写客户端和ApplicationMaster两个组件完成该功能,其中,客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序运行状态;ApplicationMaster负责向ResourceManager申请资源(以Container形式表示),并与NodeManager通信以启动各个Container,此外,ApplicationMaster还负责监控各个任务运行状态,并在失败时为其重新申请资源。
通常而言,编写一个YARN Appcalition会涉及3个RPC协议,如图4-1所示,分别为:
- ApplicationClientProtocol(用于Client与ResourceManager之间)。Client通过该协议可实现将应用程序提交到ResourceManager上、查询应用程序的运行状态或者杀死应用程序等功能。
- ApplicationMasterProtocol(用于ApplicationMaster与ResourceManager之间)。App-licationMaster使用该协议向ResourceManager注册、申请资源、获取各个任务运行情况等。
- ContainerManagementProtocol(用于ApplicationMaster与NodeManager之间)。App-licationMaster使用该协议要求NodeManager启动/撤销Container或者查询Container的运行状态。
客户端设计
YARN Application客户端的主要作用是提供一系列访问接口供用户与YARN交互,包括提交Application、查询Application运行状态,修改Application属性(如优先级)等。其中,最重要的访问接口之一是提交Application的函数。
客户端编写流程
通常而言,客户端提交一个应用程序需经过以下两个步骤。
步骤1 Client通过RPC函数ApplicationClientProtocol#getNewApplication从ResourceManager中获取唯一的application ID。
刚开始,客户端应创建一个ApplicationClientProtocol协议的RPC Client,并通过该Client与ResourceManager通信:
private ApplicationClientProtocol rmClient; //RPC Client;
// rmAddress为服务器端地址,conf为配置对象
this.rmClient = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf)
然后调用ApplicationClientProtocol #getNewApplication从ResourceManager上领取唯一的Application ID,代码如下:
GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse newApp = rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();
上面的例子中,静态方法是Records#newRecord,常用于构造一个可序列化对象,具体采用的序列化工厂由参数yarn.ipc.record.factory.class指定,默认是org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl,即构造的是Protocol Buffers序列化对象。
该函数的返回一个GetNewApplicationResponse类型的对象,它主要包含两项信息:Application ID和最大可申请资源量。
步骤2 Client通过RPC函数ApplicationClientProtocol#submitApplication将Application-Master提交到ResourceManager上。
如图4-2所示,客户端将启动ApplicationMaster所需的所有信息打包到数据结构ApplicationSubmissionContext中,该数据结构的定义在Protocol Buffers文件yarn_protos.proto中,主要包括以下几个字段(字段名称使用了Protocol Buffers文件中定义的名称)。
- application_id:Application ID(可通过函数ApplicationSubmissionContext#setXXX设置,与以下几个字段类似)
- application_name:Application 名称
- priority:Application优先级
- queue:Application 所属队列。
- user:Applications所属用户名
- unmanaged_am:是否由客户端自己启动ApplicationMaster。
- cancel_tokens_when_complete:当应用程序运行完成时,是否取消Token。通常将该值设为true,除非特殊的应用需求,需要将该应用程序的Token共享给其他应用程序。
- am_container_spec:启动ApplicationMaster相关的信息,主要包括下面几项。
- user:ApplicationMaster启动用户(可通过函数ContainerLaunchContext#setXXX设置,与以下几个字段类似)
- resource:启动ApplicationMaster所需的资源,当前支持CPU和内存两种。
- localResources:ApplicationMaster运行所需的本地资源,通常是一些外部文件,比如字典等
- localResources:ApplicationMaster运行所需的本地资源,通常是一些外部文件,比如字典等
- environment:ApplicationMaster运行时所需的环境变量。
一段(简化的)提交应用程序的实例代码如下:
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
appContext.setApplicationName(appName); //设置应用程序名称
... //设置应用程序其他属性,比如优先级、队列名称等
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class); //构造一个AM启动上下文对象
...//设置AM相关的变量
amContainer.setLocalResources(localResources);//设置AM启动所需的本地资源
amContainer.setEnvironment(env); //设置AM启动所需的环境变量
appContext.setAMContainerSpec(amContainer);
appContext.setApplicationId(appId);//appId是上一步获取的ApplicationId
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
rmClient.submitApplication(request); //将应用程序提交到ResourceManager上
除了提交Application接口外,ResourceManager还需提供以下几种接口的实现,如图4-3所示,这些接口的定义在RPC协议ApplicationClientProtocol中,具体如下:
public interface ApplicationClientProtocol {
//获取Application运行报告,包括用户、队列、运行状态等信息
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request)
throws YarnRemoteException;
//杀死Application
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request)
throws YarnRemoteException;
//获取集群的metric信息
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request)
throws YarnRemoteException;
//查看当前系统中所有应用程序信息
public GetAllApplicationsResponse getAllApplications(
GetAllApplicationsRequest request)
throws YarnRemoteException;
//查询当前系统中所有节点信息
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnRemoteException;
...//其他接口
}
以上介绍的RPC函数主要用于客户端与ResourceManager之间的通信,这一部分对所有类型的应用程序来说都是一致的,故可以做成通用的代码模块。但在实际应用环境中,为了减轻ResourceManager的负载,一旦应用程序的ApplicationMaster成功启动后,客户端通常直接与ApplicationMaster通信,以查询它的运行状态或者控制它的执行流程(比如杀死一个任务等)。这一部分与ApplicationMaster的设计相关,因此,不同类型的应用程序是不一样的。也正因如此,用户需要针对不同类型的应用程序开发不同的客户端,如图4-4所示。以MapReduce的客户端为例,当用户提交一个MapReduce应用程序时,需通过RPC协议ApplicationClientProtocol与ReourceManager通信,而一旦MapReduce的ApplicationMaster—MRAppMaster成功启动后,客户端通过另外一个RPC协议—MRClientProtocol直接与MRAppMaster通信,以查询应用程序运行状况和控制应用程序的执行(比如杀死一个Map Task等)
客户端编程库
前面提到,不同类型应用程序与ResourceManager交互逻辑是类似的,为了避免简化客户端重复开发,YARN提供了能与ResourceManager交互完成各种操作的编程库org.apache.hadoop.yarn.client.YarnClient。该库对用常用函数进行了封装,并提供了重试、容错等机制,用户使用该库可以快速开发一个包含应用程序提交、状态查询和控制等逻辑的YARN客户端,目前YARN本身自带的各类客户端均使用该编程库实现。该编程库用法如下(经过简化)
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
... //其他Java包
private YarnClient client;
//构造一个YarnClient客户端句柄并初始化
this.client = YarnClient.createYarnClient();
client.init(conf);
//启动YarnClient
yarnClient.start();
// 获取一个新的 Application ID
YarnClientApplication app = yarnClient.createApplication();
// 构造ApplicationSubmissionContext,用于提交作业
ApplicationSubmissionContext appContext =
app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setApplicationName(appName);
...
yarnClient.submitApplication(appContext); // 将应用程序提交到ResourceManager上
一个功能完备的YARN客户端,不仅需要与ResourceManager交互,还需要与ApplicationMaster交互以查询应用程序内部的信息(通常ResourceManager中没有与某个具体应用程序相关的信息)或者控制应用程序内部的任务(比如杀死任务,同样,ResourceManager中也不会有具体任务相关的信息),这一部分需要由应用程序自己设计通信协议。
ApplicationMaster设计
ApplicationMaster(AM)需要与ResourceManager(RM)和NodeManager(NM)两个服务交互,通过与ResourceManager交互,ApplicationMaster可获得任务计算所需的资源;通过与NodeManager交互,ApplicationMaster可启动计算任务(container),并监控它直到运行完成。本节将详细介绍ApplicationMaster与这两个服务交互逻辑的实现方法。
ApplicationMaster编写流程
ApplicationMaster与ResourceManager之间通信涉及三个步骤(分别对应三个API),如图4-5所示,具体如下。
步骤1 ApplicationMaster通过RPC函数ApplicationMasterProtocol#registerApplicationMaster向ResourceManager注册。
ApplicationMaster启动时,首先向ResourceManager注册,注册信息封装到Protocol Buffers消息RegisterApplicationMasterRequest中,主要包括以下字段(字段名称使用Protocol Buffers文件中定义的名称)。
- host:ApplicationMaster本次启动所在的节点host。用户可通过函数RegisterApplicationMasterRequest#getHost/RegisterApplicationMasterRequest#setHost设置或修改该值。
- rpc_port:ApplicationMaster本次启动对外的RPC端口号。用户可通过函数RegisterApplicationMasterRequest#getRpcPort/RegisterApplicationMasterRequest#setRpcPort设置或修改该值
- tracking_url:ApplicationMaster对外提供的追踪Web URL,客户端可通过该tracking_url查询应用程序执行状态。用户可通过函数RegisterApplicationMasterRequest#getTrackingUrl/RegisterApplicationMasterRequest#setTrackingUrl设置或修改该值。
ApplicationMaster注册成功后,将收到一个RegisterApplicationMasterResponse类型的返回值,主要包含以下信息。
- maximumCapability:最大可申请的单个Container占用的资源量。用户可通过函数RegisterApplicationMasterResponse#getMaximumResourceCapability获取该值。
- client_to_am_token_master_key:ClientToAMToken master key。用户可通过函数RegisterApplicationMasterResponse#getClientToAMTokenMasterKey获取该值。
- application_ACLs:应用程序访问控制列表。用户可通过函数RegisterApplicationMasterResponse#getApplicationACLs获取该值
一段(简化的)ApplicationMaster向ResourceManager注册的实例代码如下:
//定义一个ApplicationMasterProtocol协议的RPC Client
ApplicationMasterProtocol rmClient = (ApplicationMasterProtocol) rpc.getProxy
(ApplicationMasterProtocol.class, rmAddress, conf);
RegisterApplicationMasterRequest request = recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class);
synchronized (this) {
request.setApplicationAttemptId(appAttemptId);
}
...//变量赋值
request.setHost(appHostName); //设置所在的host
request.setRpcPort(appHostPort); //设置对外的host端口号
request.setTrackingUrl(appTrackingUrl); //设置tracking URL
RegisterApplicationMasterResponse response = rmClient
.registerApplicationMaster(request);//向ResourceManager注册
一旦ApplicationMaster注册成功,ResourceManager会为它返回一个RegisterApplicationMasterResponse类型的返回值,该对象包含应用程序可申请的最大资源量、应用程序访问控制列表等信息。
步骤2 ApplicationMaster通过RPC函数ApplicationMasterProtocol#allocate向Resource-Manager申请资源(以Container形式表示)。
ApplicationMaster负责将应用程序需要的资源转化成ResourceManager能识别的格式,并通过RPC函数ApplicationMasterProtocol#allocate告诉ResourceManager。该函数只有一个AllocateRequest类型的参数,主要包含以下几个字段(字段名称使用了Protocol Buffers文件中定义的名称)。
- ask:ApplicationMaster请求的资源列表,每个资源请求用ResourceRequest表示,用户可使用函数AllocateRequest#getAskList/AllocateRequest#setAskList获取或者设置请求资源列表。ResourceRequest包含以下字段。
- priority:资源优先级,为一个正整数,值越小,优先级越高。
- resource_name:期望资源所在的节点或者机架,如果是“*”,表示任何节点上的资源均可以。
- capability:所需的资源量,当前支持CPU和内存两种资源。
- num_containers:需要满足以上条件的资源数目
- relax_locality:是否松弛本地性,即是否在没有满足节点本地性资源时,自动选择机架本地性资源或者其他资源,默认值是true。本地化松弛源于类MapReduce应用中的数据本地性优化机制,它可以最大幅度地提高任务的数据本地性,提高任务运行效率。
release:ApplicationMaster释放的container列表,当出现任务运行完成、收到的资源无法使用而主动释放资源或者主动放弃分配的Container等情况时,Application-Master将释放Container。用户可使用函数AllocateRequest#getReleaseList/Allocate-Request#setReleaseList获取或者设置释放的Container列表。
response_id:本次通信的应答ID,每次通信,该值会加一。用户可使用函数Allocate-Request#getResponseId/AllocateRequest#setResponseId获取或者设置response_id值。
progress:应用程序执行进度。用户可使用函数AllocateRequest#getProgress/AllocateRequest #setProgress获取或者设置progress值。
blacklist_request:请求加入/移除黑名单的节点列表,主要包含以下两个字段。
- blacklist_additions:请求加入黑名单的节点列表
- blacklist_removals:请求移除黑名单的节点列表
用户可使用函数AllocateRequest#getResourceBlacklistRequest/AllocateRequest#setResourceBlacklistRequest获取或者设置黑名单列表。通常情况下,当应用程序发现一个节点对自己运行任务不利时(比如该节点上失败的任务数目明显高于其他节点),可申请将其加入自己的黑名单,之后ResourceManager不再为它分配来自该节点的资源。
注意 即使ApplicationMaster不需要任何资源,它仍需周期性调用ApplicationMasterProtocol#allocate函数以维持与ResourceManager之间的心跳,否则,如果一定时间内ResourceManager未收到任何来自ApplicationMaster的消息,则系统会认为它已死掉了,会将其从系统中移除或者触发容错机制。除此之外,维持心跳的另外一个功能是周期性询问ResourceManager是否存在分配给应用程序的资源,如果有,需主动获取。
ApplicationMaster每次调用ApplicationMasterProtocol#allocate后,会收到一个Allocate-Response类型的返回值,该值包含以下字段(字段名称使用了Protocol Buffers文件中定义的名称)。
a_m_command:ApplicationMaster需执行的命令,目前主要有两个取值,分别是AM_RESYNC和AM_SHUTDOWN,分别表示重启和关闭。当ResourceManager重启或者应用程序信息出现不一致状态时,可能要求ApplicationMaster重新启动;当节点处于黑名单中时,ResourceManager则让ApplicationMaster关闭。用户可使用函数AllocateResponse#getAMCommand获取该值
response_id:本次通信的应答ID,每次通信,该值会加一。用户可使用函数AllocateResponse#getResponseId获取该值
allocated_containers:分配给该应用程序的Container列表。ResourceManager将每份可用的资源封装成一个Container,该Container中有关于这份资源的详细信息,通常而言,ApplicationMaster收到一个Container后,会在这个Container中运行一个任务。用户可使用函数AllocateResponse#getAllocatedContainers获取该值。
completed_container_statuses:运行完成的Container状态列表。需要注意的是,该列表中的Container所处的状态可能是运行成功、运行失败和被杀死。用户可使用函数AllocateResponse#getCompletedContainersStatuses获取该值。
limit:目前集群可用的资源总量。用户可使用函数AllocateResponse#getAvailableResources获取该值。
updated_nodes:当前集群中所有节点运行状态列表。用户可使用函数Allocate-Response#getUpdatedNodes获取该值
num_cluster_nodes:当前集群中可用节点总数。用户可使用函数AllocateResponse#getNumClusterNodes获取该值
preempt:资源抢占信息。当ResourceManager将要抢占某个应用程序的资源时,会提前发送一个资源列表让ApplicationMaster主动释放这些资源,如果Application-Master在一定时间内未释放这些资源,则强制进行回收。Preempt中包含以下两类信息:
- strictContract:必须释放的Container列表,ResourceManager指定要求一定要释放这些Container占用的资源。
- contract:它包含资源总量和Container列表两类信息,ApplicationMaster可释放这些Container占用的资源,或者释放任意几个占用资源总量达到指定资源量的Container。
用户可使用函数AllocateResponse#getPreemptionMessage获取该值。
一段(简化的)ApplicationMaster向ResourceManager申请资源的实例代码如下:
...//变量赋值
while(1) { //维持与ResourceManager之间的周期性心跳
synchronized (this) {
askList = new ArrayList<ResourceRequest>(ask);
releaseList = new ArrayList<ContainerId>(release);
allocateRequest = BuilderUtils newAllocateRequest(appAttemptId,
lastResponseId, progressIndicator,
askList, releaseList, null);//构造一个AllocateRequest对象
}
//向ResourceManager申请资源,同时领取新分配的资源
allocateResponse = rmClient.allocate(allocateRequest);
//根据ResourceManager的应答信息设计接下来的逻辑(比如将资源分配任务)
...
Thread.sleep(1000);
}
...
步骤3 ApplicationMaster通过RPC函数ApplicationMasterProtocol#finishApplicationMaster告诉ResourceManager应用程序执行完毕,并退出。
当ApplicationMaster运行完毕后,它会调用ApplicationMasterProtocol#finishApplicationMaster通知ResourceManager,该RPC函数的参数类型为FinishApplicationMasterRequest,主要包含以下字段。
- diagnostics:诊断信息。当ApplicationMaster运行失败时,会记录错误原因以便于后期诊断
- tracking_url:ApplicationMaster对外提供的追踪Web URL。
- final_application_status:ApplicationMaster最终所处状态,可以是APP_UNDEFINED(未定义)、APP_SUCCEEDED(运行成功)、APP_FAILED(运行失败)、APP_KILLED(被杀死)。
成功执行该RPC函数后,ApplicationMaster将收到一个FinishApplicationMasterResponse类型的返回值,目前该返回值未包含任何信息。
一段(简化的)处理ApplicationMaster退出的实例代码如下:
FinishApplicationMasterRequest request = recordFactory
.newRecordInstance(FinishApplicationMasterRequest.class);
request.setAppAttemptId(appAttemptId);//设置Application ID
request.setFinishApplicationStatus(appStatus); //设置最终状态
if(appMessage != null) {
request.setDiagnostics(appMessage); //设置诊断信息
}
if(appTrackingUrl != null) {
request.setTrackingUrl(appTrackingUrl); //设置trackingURL
}
rmClient.finishApplicationMaster(request); //通知ResourceManager自己退出
ApplicationMaster将重复步骤2,不断为应用程序申请资源,直到资源得到满足或者整个应用程序运行完成。
2.AM-NM编写流程
ApplicationMaster与NodeManager之间通信涉及三个步骤(分别对应三个API),如图4-6所示,具体如下。
步骤1 ApplicationMaster将申请到的资源二次分配给内部的任务,并通过RPC函数ContainerManagementProtocol#startContainer与对应的NodeManager通信以启动Container(包含任务描述、资源描述等信息),该函数的参数类型为StartContainersRequest,主要包含一个类型为StartContainersRequest的字段,它自身包含以下两个字段(字段名称使用了Protocol Buffers文件中定义的名称):
- container_launch_context:封装了Container执行环境,主要包括以下几个字段。
- localResources:Container执行所需的本地资源,比如字典文件、JAR包或者可执行文件等,以key/value格式保存。
- tokens:Container执行所需的各种Token。
- service_data:附属服务所需的数据,以key/value格式保存。
- environment:Container执行所需的环境变量,以key/value格式保存。
- command:Container执行命令,需要是一条Shell命令。
- container_token:Container启动时的安全令牌。
ContainerManagementProtocol#startContainer执行成功后,会收到一个StartContainers-Response类型的返回值,该值包含以下几个字段:
- services_meta_data:附属服务返回的元数据信息,用户可通过函数StartContainersResponse#getAllServicesMetaData获取该值。
- succeeded_requests:成功运行的Container列表,用户可通过函数StartContainersResponse#getSuccessfullyStartedContainers获取该值。
- failed_requests:运行失败的Container列表,用户可通过函数StartContainersResponse#getFailedRequests获取该值。
示例代码如下:
...
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ContainerManager at " + cmIpPortStr);
this.cm = ((ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
cmAddress, conf));
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
//设置ctx 变量
...
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
startReq.setContainer(container);
try {
cm.startContainer(startReq);
} catch (YarnRemoteException e) {
LOG.info("Start container failed for :" + ", containerId="
+ container.getId());
e.printStackTrace();
}
步骤2 为了实时掌握各个Container运行状态,ApplicationMaster可通过RPC函数ContainerManagementProtocol#getContainerStatus向NodeManager询问Container运行状态,一旦发现某个Container运行失败,ApplicationMaster可尝试重新为对应的任务申请资源。
步骤3 一旦一个Container运行完成后,ApplicationMaster可通过RPC函数ContainerManagementProtocol#stopContainer释放Container。注意,YARN是一个资源管理系统,它不仅负责分配资源,还负责回收资源。当一个Container运行完成后,它会主动确认Container是否将对应的资源释放了,也就是说,任何一个Container运行结束后(此时Container可能已经退出且释放资源),ApplicationMaster必须调用RPC函数ContainerManagementProtocol#stopContainer释放Container(确保资源真的得到释放)。
另外,在应用程序运行过程中,用户可使用ApplicationClientProtocol#getApplicationReport查询应用程序运行状态,也可以使用ApplicationClientProtocol#forceKillApplication将应用程序杀死。
ApplicationMaster编程库
对AM编程库的介绍我们同样分AM-RM和AM-NM两部分进行
1.AM-RM编程库
同客户端编写一样,为了简化应用程序开发,ApplicationMaster与ResourceManager交互部分也可以做成一个通用的编程库。YARN提供的编程库涉及的类如图4-7所示。
ApplicationMaster与ResourceManager交互的核心逻辑均由AMRMClientImpl和AMRM-ClientAsync实现。其中,AMRMClientImpl是阻塞式实现,即每个函数执行完成之后才返回;AMRMClientAsync则是基于AMRMClientImpl的非阻塞式实现,ApplicationMaster触发任何一个操作后,AMRMClientAsync将之封装成事件放入事件队列后便返回,而事件的处理是由一个专门的线程池完成,这样整个过程变成了异步非阻塞的。当用户想要实现自己的ApplicationMaster时,只需实现回调类AMRMClientAsync.CallbackHandler,该类主要提供了5个回调函数,分别是:
public interface CallbackHandler {
/**
* 被调用时机:ResourceManager为ApplicationMaster返回的心跳应答中包含完成的
* Container信息。
* 注意:如果心跳应答中同时包含完成的Container和新分配的container,则该回调函数将在
* containersAllocated之前调用
*/
public void onContainersCompleted(List<ContainerStatus> statuses);
/**
* 被调用时机:ResourceManager为ApplicationMaster返回的心跳应答中包含新分配的
* Container信息。
* 注意:如果心跳应答中同时包含完成的Container和新分配的container,则该回调函数将在
* onContainersCompleted之后调用
*/
public void onContainersAllocated(List<Container> containers);
// 被调用时机:ResourceManager通知ApplicationMaster停止运行
public void onShutdownRequest();
// 被调用时机:ResourceManager管理的节点发生变化(比如健康节点变得不健康,节点不可用)
public void onNodesUpdated(List<NodeReport> updatedNodes);
// 被调用时机:任何出现异常的时候
public void onError(Exception e);
}
下面我们举例说明,假设用户实现了一个MyCallbackHandler,代码如下:
class MyCallbackHandler implements AMRMClientAsync.CallbackHandler{
...
}
可以采用如下非阻塞方式使用该MyCallbackHandler(经简化):
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
…… //其他Java包
AMRMClientAsync.CallbackHandler allocListener = new MyCallbackHandler();
// 构造一个AMRMClientAsync句柄
asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
asyncClient.init(conf); //通过传入一个YarnConfiguration对象进行初始化
asyncClient.start(); //启动asyncClient
// ApplicationMaster向ResourceManager注册
RegisterApplicationMasterResponse response = asyncClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
... // 添加Container请求
asyncClient.addContainerRequest(containerRequest);
... // 等待应用程序运行结束
asyncClient.unregisterApplicationMaster(status, appMsg, null);
asyncClient.stop();
除了实现以上几个回调函数外,AMRMClientAsync还提供了以下几个接口供用户编写ApplicationMaster使用,分别是:
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException;//向ResourceManager注册ApplicationMaster
// 通知ResourceManager注销ApplicationMaster
public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl)
throws YarnRemoteException;
// 添加资源(Container)请求
public void addContainerRequest(AMRMClient.ContainerRequest req);
// 移除资源(Container)请求
public void removeContainerRequest(AMRMClient.ContainerRequest req);
// 请求释放资源
public void releaseAssignedContainer(ContainerId containerId);
...
2.AM-NM编程库
同客户端和AM-RM编写一样,为了简化应用程序开发,ApplicationMaster与NodeManager交互部分也可以做成一个通用的编程库.
需要注意的是,由于ResourceManager也要与NodeManager交互以启动应用程序的ApplicationMaster,因此该编程库也可用在ResourceManager实现逻辑中,因此该编程库的名称不再是AMNMClient,而是NMClient。YARN提供的NMClient编程库涉及的类如图4-8所示。
ApplicationMaster与NodeManager交互的核心逻辑均由NMClientImpl和NMClientAsync实现,其中,NMClientImpl是阻塞式实现,即每个函数执行完成之后才返回;NMClient-Async则是基于NMClientImpl的非阻塞式实现,ApplicationMaster触发任何一个操作后,NMClientAsync将之封装成事件放入事件队列后便返回,而事件的处理是由一个专门的线程池完成,这样整个过程变成了异步非阻塞的。当用户想要实现自己的ApplicationMaster时,只需实现回调类NMClientAsync.CallbackHandler ,该类主要提供了6个回调函数,分别是:
public static interface CallbackHandler {
// 当接收到启动Container请求时被调用
void onContainerStarted(ContainerId containerId,
Map<String, ByteBuffer> allServiceResponse);
// 当NodeManager应答(对之前发送查询状态指令的应答)Container当前状态时被调用
void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus);
// 当NodeManager应答(对之前发送停止container指令的应答)Container已停止时被调用
void onContainerStopped(ContainerId containerId);
// 当NodeManager启动Container过程中抛出异常时被调用
void onStartContainerError(ContainerId containerId, Throwable t);
// 当查询container运行状态时抛出异常时被调用
void onGetContainerStatusError(ContainerId containerId, Throwable t);
// 当NodeManager停止container过程中抛出异常时被调用
void onStopContainerError(ContainerId containerId, Throwable t);
}
下面举例说明,假设用户实现了一个MyCallbackHandler,代码如下:
class MyCallbackHandler implements NMClientAsync.CallbackHandler{
...
}
可以采用如下非阻塞方式使用该MyCallbackHandler(经简化)
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
... //其他Java包
// 构造一个NMClientAsync句柄
NMClientAsync asyncClient = new NMClientAsyncImpl(new MyCallbackhandler());
asyncClient.init(conf); //初始化asyncClient
asyncClient.start(); //启动asyncClient
//构造Container信息
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
... //设置ctx变量
//启动Container
asyncClient.startContainerAsync(container, ctx);
// 获取Container状态
asyncClient.getContainerStatusAsync(container.getId(),
container.getNodeId(), container.getContainerToken());
// 停止Container
asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
container.getContainerToken());
asyncClient.stop();
YARN 应用程序实例
本节介绍两个YARN自己带的Application示例程序:DistributedShell和UnManaged AM。需要注意的是,尽管这两个示例程序非常简单,但是它们已经具有了一个应用程序具备的所有功能,其他任何应用程序均可在这两个程序基础上扩展而来。
DistributedShell
DistributedShell是一个可以分布式运行Shell命令的应用程序,它可以并行执行用户提供的Shell命令或者Shell脚本。
DistributedShell的使用方法如下:
bin/hadoop jar \
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar \
org.apache.hadoop.yarn.applications.distributedshell.Client
[COMMAND_OPTIONS]
其中,COMMAND_OPTIONS的参数及其含义如表4-1所示。
运行示例如下:
bin/hadoop jar\
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar\
org.apache.hadoop.yarn.applications.distributedshell.Client\
--jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar\
--shell_command ls\
--num_containers 10\
--container_memory 350\
--master_memory 350\
--priority 10
接下来重点介绍DistributedShell的客户端和ApplicationMaster实现方法。
(1)客户端实现
客户端的实现方法与前面描述的步骤完全一致,主要提供了两个函数用于应用程序提交(并监控它的运行过程直到运行完成)和杀死应用程序。在应用程序提交时,Client首先从参数中获取各个属性值,并构造出ApplicationSubmissionContext对象,将应用程序提交到YARN上,但需要注意两点。
1)Shell脚本处理。如果用户指定了一个Shell脚本,则客户端首先将脚本内容写入HDFS上的"/user/appName/$appId/ExecShellScript.sh"文件中,并将该文件信息写入到以下几个环境变量中。
- DISTRIBUTEDSHELLSCRIPTLOCATION:Shell脚本在HDFS上的位置。
- DISTRIBUTEDSHELLSCRIPTTIMESTAMP:Shell脚本最近修改时间。
- DISTRIBUTEDSHELLSCRIPTTIMESTAMP:Shell脚本最近修改时间。
而ApplicationMaster则会获取以上三个环境变量信息,用于验证文件的可用性以及进一步将文件加入到文件分发列表中
2)构建ApplicationMaster启动命令。ApplicationSubmissionContext对象中最重要的一个字段是ApplicationMaster启动命令,而DistributedShell客户端构造内容如下
/bin/java -Xmx 350m \
org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster \
--container_memory 350 \
--num_containers 10 \
--priority 10 \
--shell_command ls \
1> <LOG_DIR>/AppMaster.stdout \
2> <LOG_DIR>/AppMaster.stderr
ApplicationMaster将会读取这些参数进一步完成资源申请和Container启动等工作。
(2)ApplicationMaster实现
ApplicationMaster启动时,将所需的所有资源一次性发送给ResourceManager,相关代码如下:
for (int i = 0; i < numTotalContainers; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
// resourceManager是一个AMRMClientAsync对象
resourceManager.addContainerRequest(containerAsk);
}
private ContainerRequest setupContainerAskForRM() {
Priority pri = Records.newRecord(Priority.class);// 设置优先级
pri.setPriority(requestPriority);// 设置资源量
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
ContainerRequest request = new ContainerRequest(capability, null, null,pri);
return request;
}
一旦ApplicationMaster收到一个Container后,将启动一个独立线程与对应的NodeManager通信,以运行任务;与此同时,如果发现某个Container被杀死,ApplicationMaster会为它重新申请资源。DistributedShell ApplicationMaster中的ResourceManager回调类实现如下:
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
public void onContainersCompleted(List<ContainerStatus> completedContainers) {
for (ContainerStatus containerStatus : completedContainers) {
// Container应该处于完成状态,可能是成功、失败或者被杀死三个状态之一
assert (containerStatus.getState() == ContainerState.COMPLETE);
// increment counters for completed/failed containers
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) { // Container运行失败
if (ContainerExitStatus.ABORTED != exitStatus) {
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
} else { // Container被杀死,此时需重新为它申请资源
numAllocatedContainers.decrementAndGet();
numRequestedContainers.decrementAndGet();
}
} else { // Container成功运行完成
numCompletedContainers.incrementAndGet();
}
}
// 如果Container是被杀死的,需重新为它申请资源
int askCount = numTotalContainers - numRequestedContainers.get();
numRequestedContainers.addAndGet(askCount);
if (askCount > 0) {
ContainerRequest containerAsk = setupContainerAskForRM(askCount);
resourceManager.addContainerRequest(containerAsk);
}
// 将进度汇报给ResourceManager
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
resourceManager.setProgress(progress);
if (numCompletedContainers.get() == numTotalContainers) {
done = true;
}
}
public void onContainersAllocated(List<Container>> allocatedContainers) {
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
allocatedContainer);
Thread launchThread = new Thread(runnableLaunchContainer);
// 每个Container由一个独立的线程启动
launchThreads.add(launchThread);
launchThread.start();
}
}
public void onRebootRequest() {}
public void onNodesUpdated(List<NodeReport> updatedNodes) {}
}
Unmanaged AM
在YARN中,一个ApplicationMaster需要占用一个Container,该Container可能位于任意一个NodeManager上,这给ApplicationMaster调试带来很大麻烦。为了解决该问题,YARN引入了一种新的ApplicationMaster—Unmanaged AM。这种AM运行在客户端,不再由ResourceManager启动和销毁。用户只需在提交应用程序时设置一个参数,YARN便允许用户将ApplicationMaster运行在客户端的一个单独进程中。
Unmanaged AM工作步骤如下。
1)通过RPC函数ApplicationClientProtocol#getNewApplication获取一个Application ID。
2)创建一个ApplicationSubmissionContext对象,填充各个字段,并通过调用函数ApplicationClientProtocol.setUnmanagedAM(true)设置启用Unmanaged AM的标志位。
3)通过RPC函数ApplicationClientProtocol#submitApplication将应用程序提交到ResourceManage上,并监控Application运行状态,直到其状态变为YarnApplicationState.ACCEPTED。
4)在客户端中的一个独立线程中启动ApplicationMaster,然后等待ApplicationMaster运行结束和ResourceManage报告应用程序运行结束。
小结
本文介绍了YARN Application的程序设计方法。当用户想要编写一个可以运行在YARN上的应用程序时,通常需要实现两个组件,分别是客户端和ApplicationMaster,其中,客户端主要用于提交应用程序和管理应用程序,而ApplicationMaster则负责实现应用程序的任务切分、调度、监控等功能。
为了便于用户实现应用程序的客户端和ApplicationMaster,YARN提供了可供用户直接使用的客户端编程库、AM-RM编程库和AM-NM编程库。