Spark2.x---6. Spark Yarn Client模式解析

最近在定位Yarn的crash问题时,顺便把spark怎么使用yarn的好好的梳理了一遍。不过我先了解一下Yarn和怎么提交yarn的job的。

首先我们先看看Yarn的架构:



图1 Yarn分布式架构

ResourceManager

a)一个纯粹的调度器

b)根据应用程序的资源请求严格限制系统的可用资源

c)在保证容量、公平性及服务等级的情况下,优化集群资源利用率,让所有资源都得到充分的利用

d)由可插拔的调度器来应用不同的调度算法,如注重容量调度还是注意公平调度

ApplicationManager

a)负责与ResourceManager协商资源,并和NodeManager协同工作来执行和监控Container以及他们的资源消耗

b)有责任与ResourceManager协商并获取合适的资源Container,跟踪他们的状态,以及监控其进展

c)在真实环境中,每一个应用都有自己的ApplicationMaster的实例,但是也可为一组提供一个ApplicationMaster,比如Pig或者Hive的ApplicationMaster

一、Yarn

Client编写

1.创建Yarn客户端

YarnClient yarnClient =

YarnClient.createYarnClien。t();

yarnClient.init(conf);

yarnClient.start();

2.创建Yarn应用

YarnClientApplication app =yarnClient.createApplication();

3.设置Applicaton的名字,内存和cpu需求以及优先级和Queue信息,YARN将根据这些信息来调度AppMaster

app.getApplicationSubmissionContext().setApplicationName("jenkins.ApplicationMaster");

app.getApplicationSubmissionContext().setResource(Resource.newInstance(100,1));

app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));

app.getApplicationSubmissionContext().setQueue("default");

4.设置ContainerLaunchContext,这一步,amContainer中包含了App Master执行需要的资源文件,环境变量和启动命令,这里将资源文件上传到了HDFS,这样在NODE Manager就可以通过HDFS取得这些文件

app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);

5.提交应用

ApplicationId appId =yarnClient.submitApplication(app.getApplicationSubmissionContext());

二、YARN ApplicationMaster编写

ApplicationMaster编写的编写比较复杂,其需要通Resource Manager和Node Manager交互,

通过ResourceManager:申请Container,并接收Resource Manager的一些消息,如可用的Container,结束的Container等。

通过NodeManage:启动Container,并接收Node Manage的一些消息,如Container的状态变化以及Node状态变化。

1.创建一个AMRMClientAsync对象,这个对象负责与Resource

Manager交互

amRMClient= AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());

这里的RMCallbackHandler是我们编写的继承自AMRMClientAsync.CallbackHandler的一个类,其功能是处理由Resource

Manager收到的消息,

其需要实现的方法由如下

publicvoid onContainersCompleted(List statuses);

publicvoid onContainersAllocated(List containers) ;

publicvoid onShutdownRequest() ;

publicvoid onNodesUpdated(List updatedNodes) ;

publicvoid onError(Throwable e) ;

这里不考虑异常的情况下,只写onContainersAllocated,onContainersCompleted这两个既可以,一个是当有新的Container可以使用,一个是Container运行结束。

在onContainersAllocated我们需要编写启动container的代码,amNMClient.startContainerAsync(container, ctx);这里的ctx同Yarn Client中第4步中的amContainer是同一个类型,即这个container执行的一些资源,环境变量与命令等,因为这是在回调函数中,为了保证时效性,这个操作最好放在线程池中异步操作。

在onContainersCompleted中,如果是失败的Container,我们需要重新申请并启动Container,(这一点有可能是YARN的Fair Schedule中会强制退出某些Container以释放资源)成功的将做记录既可以。

2.创建一个NMClientAsyncImpl对象,这个对象负责与Node

Manager交互

amNMClient= new NMClientAsyncImpl(new NMCallbackHandler());

这里NMCallbackHandler使我们需要编写的继承自NMClientAsync.CallbackHandler的对象,其功能是处理由Node

Manager收到的消息

publicvoid onContainerStarted(ContainerId containerId,MapallServiceResponse);

publicvoid onContainerStatusReceived(ContainerId containerId,ContainerStatus containerStatus);

publicvoid onContainerStopped(ContainerId containerId) ;

publicvoid onStartContainerError(ContainerId containerId, Throwable t);

publicvoid onGetContainerStatusError(ContainerId containerId,Throwable t) ;

publicvoid onStopContainerError(ContainerId containerId, Throwable t);

这里简单的不考虑异常的情况下,这些函数可以写一个空函数体,忽略掉处理

3.将ApplicationMaster注册到Resource

Manager上

RegisterApplicationMasterResponseresponse = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1,"");

这个函数将自己注册到RM上,这里没有提供RPC

port和TrackURL.

4. ApplicationMaster向Resource

Manager申请Container

ContainerRequestcontainerAsk = new ContainerRequest(

//100*10M + 1vcpu

Resource.newInstance(100, 1), null, null,

Priority.newInstance(0));

amRMClient.addContainerRequest(containerAsk);

这里一个containerAsk表示申请一个Container,这里的对nodes和rasks设置为NULL,猜测MapReduce应该由参数来尝试申请靠近HDFS

block的container的

5.申请到Container后,回调AMRMClientAsync.CallbackHandler的onContainersAllocated就会响应,然后通过amNMClient在Container运行计算任务:

Listcommands = new LinkedList();

commands.add("sleep" + sleepSeconds.addAndGet(1));

ContainerLaunchContextctx = ContainerLaunchContext.newInstance(null, null, commands, null, null,null);

amNMClient.startContainerAsync(container,ctx);

6.等待Container执行完毕,清理退出

我的代码如下,循环等待container执行完毕,并上报执行结果

voidwaitComplete() throws YarnException, IOException{

while(numTotalContainers.get() !=numCompletedConatiners.get()){

try{

Thread.sleep(1000);

LOG.info("waitComplete"+

", numTotalContainers=" +numTotalContainers.get() +

", numCompletedConatiners=" +numCompletedConatiners.get());

} catch (InterruptedException ex){}

}

exeService.shutdown();

amNMClient.stop();

amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,"dummy Message", null);

amRMClient.stop();

}

三、YARN Container Application

真正处理数据的是由ApplicationMaster由amNMClient.startContainerAsync(container, ctx)提交的Containerapplication,然后这这个应用并不需要特殊编写,任何程序通过提交相应的运行信息都可以在这些Node中的某个Container中执行,所以这个程序可以是一个复杂的MapReduce Task或者是一个简单的脚本。

总结:

YARN提供了对cluster资源管理和作业调度的功能。

编写一个应用运行在YARN之上,比较复杂的是ApplicationMaster的编写,其需要维护container的状态并能共做一些错误恢复,重启应用的操作。比较简答的是Client的编写,只需要提交必须的信息既可以,不需要维护状态。真正运行处理数据的是Container Application,这个程序可以不需要针对YARN做代码编写

四、Spark Yarn Client模式

Spark Yarn有两种模式,一直是client模式,一种是cluster模式,今天我们先说说client模式,以下是Spark YarnClient的交互图。


图2 Spark Yarn Client模式


图3 Spark Yarn类图 


图4 Spark Yarn Client模式job提交过程
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容