开发yarn应用程序

前言

YARN是一个资源管理系统,负责集群资源的管理和分配。yarn就好比hadoop集群的操作系统,当用户向YARN中提交一个应用程序后,需要同操作系统做一些交互,这样才能运行在yarn上。yarn分两个阶段运行应用程序: 第一个阶段是启动ApplicationMaster; 第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如果想要将一个新的应用程序跑在yarn上,通常需要编写这两个组件:客户端和ApplicationMaster。这两个组件的编写非常复杂,尤其是ApplicationMaster,需要考虑rpc调用,容错等细节,往往由专业的开发人员编写这两个组件,然后提供给上层的应用程序用户使用。如果大量 应用程序可以抽象为一种通用的框架, 那么只需要实现一个客户端和一个ApplicationMaster,这样所有的应用程序直接用这两个组件即可,比如MapReduce是一种通用的计算框架,YARN已经内嵌了一个可以直接使用的客户端MRClientService和ApplicationMaster—MRAppMaster。对于无法用MapReduce计算框架抽象出来的应用程序,要想跑在yarn上,就必须用户自己编写这两个组件,客户端和ApplicationMaster,其中,客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序运行状态,ApplicationMaster负责向ResourceManager申请资源(以Container形式表示),并与NodeManager通信以启动各个Container,此外,ApplicationMaster还负责监控各个任务运行状态,并在失败是为其重新申请资源。

开发Client启动AM

Client部分是用于将应用提交到YARN, 从而可以启动application master.客户端通常只需与ResourceManager交互,期间涉及到多个数据结构和一个RPC协议,具体如下:

image.png

客户端通过rpc协议ApplicationClientProtocal向ResourceManager(具体是ApplicationsManager、ASM)发送应用程序提交请求GetNewApplicationRequest,ResourceManager为其返回应答GetNewApplicationResponse,该数据结构中包含多种信息,包括ApplicationId、可资源使用上限和下限等。初始化并启动一个yarnClient:

YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
  • Client部分最关键的是构建一个ApplicationSubmissionContext。启动ApplicationMaster所需的所有信息打包到数据结构ApplicationSubmissionContext中,主要包括以下几种信息:
(1) application id
(2) application 名称
(3) application优先级
(4) application 所属队列
(5) application 启动用户名
(6) ApplicationMaster对应的Container信息ContainerLaunchContext,包括:启动ApplicationMaster所需各种文件资源、jar包、环境变量、启动命令、运行ApplicationMaster所需的资源(主要指内存)等。
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);

客户端调用ClientRMProtocol#submitApplication(ApplicationSubmissionContext)将ApplicationMaster提交到ResourceManager上。ResourceManager收到请求后,会为ApplicationMaster寻找合适的节点,并在该节点上启动它。

LOG.info("Submitting application to ASM");
yarnClient.submitApplication(appContext);

客户端可通过多种方式查询应用程序的运行状态,其中一种是调用RPC函数ClientRMProtocol#getApplicationReport获取一个应用程序当前运行状况报告,该报告内容包括应用程序名称、所属用户、所在队列、ApplicationMaster所在节点、一些诊断信息、启动时间等。

// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for"
    + ", appId=" + appId.getId()
    + ", clientToAMToken=" + report.getClientToAMToken()
    + ", appDiagnostics=" + report.getDiagnostics()
    + ", appMasterHost=" + report.getHost()
    + ", appQueue=" + report.getQueue()
    + ", appMasterRpcPort=" + report.getRpcPort()
    + ", appStartTime=" + report.getStartTime()
    + ", yarnAppState=" + report.getYarnApplicationState().toString()
    + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
    + ", appTrackingUrl=" + report.getTrackingUrl()
    + ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
  • 如果有异常或者其他情况,可以通过yarnClient.killApplication(appId);来kill掉应用;

开发ApplicationMaster

ApplicationMaster需要与ResoureManager和NodeManager交互,以申请资源和启动Container,期间涉及到多个数据结构和两个RPC协议。具体步骤如下:

  • ApplicationMaster首先需通过RPC协议AMRMProtocol向ResourceManager发送注册请求RegisterApplicationMasterRequest,该数据结构中包含ApplicationMaster所在节点的host、RPC port和TrackingUrl等信息,而ResourceManager将返回RegisterApplicationMasterResponse,该数据结构中包含多种信息,包括该应用程序的ACL列表、可资源使用上限和下限等。


    image.png
  • ApplicationMaster与RM之间的心跳:整个运行过程中,ApplicationMaster需通过心跳与ResourceManager保持联系,这是因为,如果一段时间内(默认是10min),ResourceManager未收到ApplicationMaster信息,则认为它死掉了,会重新调度或者让其失败。通常而言,ApplicationMaster周期性调用RPC函数ApplicationMasterProtocol.allocate向其发送空的AllocateRequest请求即可。

  • 构造Container:根据每个任务的资源需求,ApplicationMaster可向ResourceManager申请一系列用于运行任务的Container,ApplicationMaster使用ResourceRequest类描述每个Container(一个container只能运行一个任务):

1)Hostname:期望Container所在的节点,如果是*,表示可以为任意节点。
2)Resource capability:运行该任务所需的资源量,如(memory/disk/cpu)。
3)Priority:任务优先级。一个应用程序中的任务可能有多种优先级,ResourceManager会优先为高优先级的任务分配资源。
4)numContainers:符合以上条件的container数目。
  • 申请资源分配Container:一旦为任务构造了Container后,ApplicationMaster会使用RPC函数AMRMProtocol#allocate向ResourceManager发送一个AllocateRequest对象,以请求分配这些Container,AllocateRequest中包含以下信息:
1)Requested containers:所需的Container列表
2)Released containers:有些情况下,比如有些任务在某些节点上失败过,则ApplicationMaster不想再在这些节点上运行任务,此时可要求释放这些节点上的Container。
3)Progress update information:应用程序执行进度
4)ResponseId:RPC响应ID,每次调用RPC,该值会加1。

ResourceManager会为ApplicationMaster返回一个AllocateResponse对象,该对象中主要信息包含在AMResponse中:

1)reboot:ApplicationMaster是否需要重新初始化.当ResourceManager端出现不一致状态时,会要求对应的ApplicationMaster重新初始化。
2)Allocated Containers:新分配的container列表。
3)Completed Containers:已运行完成的container列表,该列表中包含运行成功和未成功的Container,ApplicationMaster可能需要重新运行那些未运行成功的Container。
  • ApplicationMaster会不断追踪已经获取的container,且只有当需求发生变化时,才允许重新为Container申请资源。


    image.png
  • 启动Container, 当ApplicationMaster(从ResourceManager端)收到新分配的Container列表后,会使用ContainerManagementProtocol#startContainer向对应的NodeManager发送ContainerLaunchContext以启动Container,ContainerLaunchContext包含以下内容:

1)ContainerId:Container id
2)Resource:该Container可使用的资源量(当前仅支持内存)
3)User:Container所属用户
4)Security tokens:安全令牌,只有持有该令牌才可启动container
5)LocalResource:运行Container所需的本地资源,比如jar包、二进制文件、其他外部文件等。
6)ServiceData:应用程序可能使用其他外部服务,这些服务相关的数据通过该参数指定。
7)Environment:启动container所需的环境变量
8)command:启动container的命令
  • 监控Container:ApplicationMaster可以通过2种途径监控启动的Container:
使用ApplicationMasterProtocol.allocate向ResourceManager发送查询请求;
使用ContainerManagementProtocol查询指定的ContainerId对应的Container的状态;
  • ApplicationMaster会不断重复前面的步骤,直到所有任务运行成功,此时,它会发送FinishApplicationMasterRequest,以告诉ResourceManage自己运行结束。

distributedshell实例分析

Distributedshell Client的入口main函数如下:

public static void main(String[] args) {
  ...
   Client client = new Client();
   boolean doRun = client.init(args);
    if (!doRun) {
       System.exit(0);
      }
    result = client.run();
    ...
}

DistributedShell Client中最重要的是函数为run(),该函数实现过程如下:
利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager

private void connectToASM() throws IOException {
  YarnConfiguration yarnConf = new YarnConfiguration(conf);
  InetSocketAddress rmAddress = yarnConf.getSocketAddr(
        YarnConfiguration.RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_PORT);
  LOG.info("Connecting to ResourceManager at " + rmAddress);
  applicationsManager = ((ClientRMProtocol) rpc.getProxy(
        ClientRMProtocol.class, rmAddress, conf));
  } 
  • 获取application id。
    与ResourceManager通信,请求application id:
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);

  • 构造ContainerLaunchContext。
    构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//添加本地资源
//填充localResources
amContainer.setLocalResources(localResources);
//添加运行ApplicationMaster所需的环境变量
Map<String, String> env = new HashMap<String, String>();
//填充env
amContainer.setEnvironment(env);
//添加启动ApplicationMaster的命令
//填充commands;
amContainer.setCommands(commands);
//设置ApplicationMaster所需的资源
amContainer.setResource(capability);

  • 构造ApplicationSubmissionContext。
    构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:
ApplicationSubmissionContext appContext = 
Records.newRecord(ApplicationSubmissionContext.class);
//设置application id,调用GetNewApplicationResponse#getApplicationId()
appContext.setApplicationId(appId);
//设置Application名称:“DistributedShell”
appContext.setApplicationName(appName);
//设置前面创建的container
appContext.setAMContainerSpec(amContainer);
//设置application的优先级,默认是0
pri.setPriority(amPriority);
//设置application的所在队列,默认是""
appContext.setQueue(amQueue);
//设置application的所属用户,默认是""
appContext.setUser(amUser);
  • 提交ApplicationMaster。
    将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:
applicationsManager.submitApplication(appRequest);
  • 显示应用程序运行状态。
    为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:
while (true) {
  Thread.sleep(1000);
  GetApplicationReportRequest reportRequest = 
Records.newRecord(GetApplicationReportRequest.class);
  reportRequest.setApplicationId(appId);
  GetApplicationReportResponse reportResponse = 
applicationsManager.getApplicationReport(reportRequest);
  ApplicationReport report = reportResponse.getApplicationReport();
  //打印report内容
  ...
  YarnApplicationState state = report.getYarnApplicationState();
  FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
  if (YarnApplicationState.FINISHED == state) {
    if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
      return true;
    } else {
      return false;
    }
  } else if (YarnApplicationState.KILLED == state   
          || YarnApplicationState.FAILED == state) {
    return false;
  } 
}

Distributedshell ApplicationMaster源码分析

  • ApplicationMaster由ResourceManager分配的一个container启用,之后,它与ResourceManager通信,注册自己,以告知自己所在的节点(host:port),trackingurl(客户端可通过该url直接查询AM运行状态)等。
RegisterApplicationMasterRequest appMasterRequest = 
Records.newRecord(RegisterApplicationMasterRequest.class);
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostname); appMasterRequest.setRpcPort(appMasterRpcPort);
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
return resourceManager.registerApplicationMaster(appMasterRequest);
  • ApplicationMaster周期性向ResourceManager发送心跳信息,以告知ResourceManager自己仍然活着,这是通过周期性调用AMRMProtocol#allocate实现的。

  • 为了完成计算任务,ApplicationMaster需向ResourceManage发送一个ResourceRequest描述对资源的需求,包括container个数、期望资源所在的节点、需要的CPU和内存等,而ResourceManager则为ApplicationMaster返回一个AllocateResponse结构以告知新分配到的container列表、运行完成的container列表和当前可用的资源量等信息。

while (numCompletedContainers.get() < numTotalContainers
        && !appDone) {
   Thread.sleep(1000);
   List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
   if (askCount > 0) {
    ResourceRequest containerAsk = setupContainerAskForRM(askCount);
    resourceReq.add(containerAsk);
   }
   //如果resourceReq为null,则可看做心跳信息,否则就是申请资源
   AMResponse amResp =sendContainerAskToRM(resourceReq);
}
  • 对于每个新分配到的container,ApplicationMaster将创建一个ContainerLaunchContext对象,该对象包含container id,启动container所需环境、启动container命令,然后与对应的节点通信,以启动container。
LaunchContainerRunnable runnableLaunchContainer = 
new LaunchContainerRunnable(allocatedContainer); 
//每个container由一个线程启动
Thread launchThread = new Thread(runnableLaunchContainer); 
launchThreads.add(launchThread);
launchThread.start();
  • ApplicationMaster通过AMRMProtocol#allocate获取各个container的运行状况,一旦发现某个container失败了,则会重新向ResourceManager发送资源请求,以重新运行失败的container。
  • 作业运行失败后,ApplicationMaster向ResourceManager发送FinishApplicationMasterRequest请求,以告知自己运行结束。
FinishApplicationMasterRequest finishReq = 
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
boolean isSuccess = true;
if (numFailedContainers.get() == 0) {
  finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容