Spark的Driver和ApplicationMaster进程核数设置之我见

配置

Configuration Default Value Meaning
spark.driver.cores 1 Number of cores to use for the driver process, only in cluster mode.
cluster模式下driver进程的核数,因为改模式下am和driver实为一体,故也是am的的核数。
spark.yarn.am.cores 1 Number of cores to use for the YARN Application Master in client mode.In cluster mode, use spark.driver.cores instead.
yarn client模式下,am的核数,driver和am在该模式下是分离的,故此时是没有driver核数这样一个概念的

源码

 private val amCores = if (isClusterMode) {
    sparkConf.get(DRIVER_CORES)
  } else {
    sparkConf.get(AM_CORES)
  }

具体可以参见yarn.Client.scala#L87,这边用去读取参数设置ApplicationMaster的对应核数。

val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(amMemory + amMemoryOverhead)
    capability.setVirtualCores(amCores)

    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
      case Some(expr) =>
        try {
          val amRequest = Records.newRecord(classOf[ResourceRequest])
          amRequest.setResourceName(ResourceRequest.ANY)
          amRequest.setPriority(Priority.newInstance(0))
          amRequest.setCapability(capability)
...

具体可以参见yarn.Client.scala#L251,这边被封装进am请求消息,提交到Yarn,一系列操作后最终交给NodeManager生成一个ApplicationMaster的Container,也即一个JVM进程。

分析

我们知道,我们在玩JVM的时候可以设置各种内存参数,比如Xmx, Xss等,也可以设置GC的线程数,比如-XX:ParallelGCThreads等,貌似没有直接设置JVM用多少Thread的参数,参见,

 java -XX:+PrintFlagsInitial | grep "Thread" | grep -v "bool"
     intx CompilerThreadPriority                    = -1              {product}
     intx CompilerThreadStackSize                   = 0               {pd product}
    uintx ConcGCThreads                             = 0               {product}
     intx DefaultThreadPriority                     = -1              {product}
    uintx G1ConcRefinementThreads                   = 0               {product}
    uintx HeapSizePerGCThread                       = 87241520        {product}
    uintx NewSizeThreadIncrease                     = 5320            {pd product}
    uintx ParallelGCThreads                         = 0               {product}
     intx ThreadPriorityPolicy                      = 0               {product}
    uintx ThreadSafetyMargin                        = 52428800        {product}
     intx ThreadStackSize                           = 1024            {pd product}
     intx VMThreadPriority                          = -1              {product}
     intx VMThreadStackSize                         = 1024            {pd product}

那我们设置这个核数有什么用处呢?提高ApplicationMaster的并发?BUT HOW?

对比

相比而言,spark.executor.cores容易理解的多,该参数用以设置executor的核数,是一个简单的数值,比如我们设置为4,driver端根据executor们当前的空闲核数进行task的分配,分配了空闲核数就-1,收到task完成的消息就+1,空出来让driver在分配task过来,如此而已。

但看ApplicationMaster的源码,亦没有诸如此类的受控操作。

一个错误

18/02/04 06:27:52 ERROR yarn.ApplicationMaster: Exception from Reporter thread.
org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
    at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
    at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
    at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
    at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:101)
    at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)
    at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy24.allocate(Unknown Source)
    at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)
    at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:260)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:458)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException): Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
    at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
    at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
    at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy23.allocate(Unknown Source)
    at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
    ... 9 more
18/02/04 06:27:52 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Application attempt appattempt_1515478669260_917050_000001 doesn't exist in ApplicationMasterService cache.
    at org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:439)
    at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
    at org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
)
18/02/04 06:27:52 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

错误大意是ApplicationMaster作为客户端向ResourceManager请求分配Container的时候,发现自己已经不被ResourceManager认识,得失忆症了?如果ApplicationMaster没有向ResourceManager unregister自个儿,那必然是时间太长没有联系,断绝了关系。

看下ApplicationMaster的源码

private def launchReporterThread(): Thread = {
    // The number of failures in a row until Reporter thread give up
    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)

    val t = new Thread {
      override def run() {
        var failureCount = 0
        while (!finished) {
          try {
            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
              finish(FinalApplicationStatus.FAILED,
                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                s"Max number of executor failures ($maxNumExecutorFailures) reached")
            } else {
              logDebug("Sending progress")
              allocator.allocateResources()
            }
            failureCount = 0
          } catch {
            case i: InterruptedException =>
            case e: Throwable =>
              failureCount += 1
              // this exception was introduced in hadoop 2.4 and this code would not compile
              // with earlier versions if we refer it directly.
              if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
                e.getClass().getName()) {
                logError("Exception from Reporter thread.", e)
                finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
                  e.getMessage)
              } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
                finish(FinalApplicationStatus.FAILED,
                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
                    s"$failureCount time(s) from Reporter thread.")
              } else {
                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
              }
          }

.....

代码很简单,ApplicationMaster这边会起一个线程,没有异常的情况下,会周期性的(spark.yarn.scheduler.heartbeat.interval-ms 3秒)执行allocator.allocateResources(), 如果有executor挂了不足我们所申请的数目,就补申请几个,没有缺的情况下还继续执行这个代码,那估计就相当于ApplicationMaster与ResourceManager之间的心跳了。

讲到这里我们在回过头来看那两个参数就有点眉目了,java的线程数虽然不直接等价于CPU的线程数,但本质上还是参与了cpu时间片的竞争而已。在Yarn这一层对核数这个理解就比较简单了,我们先获取CPU个数*单个CPU的逻辑核数得到某台机器总线程数,然后NodeManager就根据这个数值来完成Container的分配,包括我们所说的ApplicationMaster及Spark的Executor,分配一个就减去这个Container的核数,单个Container的核数越大,参与CPU竞争的总Container个数就越小。对于cluster模式而言,Driver和ApplicationMaster独立存在于一个进程,线程比client模式下更加繁忙,遇到类似GC的问题,极有可能使我们与ResourceManager心跳线程无法获得执行机会,超过它认为的时间,am就失联了。所以我们设置spark.driver.cores的意义就来了,我们增大这个数值,就有可能可以减少该计算机节点上分配其他Container,物理资源就不会那么吃紧,这种场景的发生概率也会相应降低。

总结

spark.driver.cores在生产环境下在合理的范围内可以适当调整的大些,应该是有助于其处理能力的提升的,通过挤兑别人的方式。。不然我也想不出这两参数有啥用了。。

后记

咨询了下组里yarn的同事,说启用的cgroup之后应该这些参数的作用应该就更加明显和明确了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实...
    Abelsun阅读 2,490评论 0 2
  • YarnYarn产生背景:Yarn直接来自于MR1.0MR1.0 问题:采用的是master slave结构,ma...
    时待吾阅读 5,544评论 2 23
  • 1. 名词解释: 作业相关的名词解释 Application:Spark Application的概念和Hadoo...
    Java旅行者阅读 4,318评论 2 9
  • “感情哪有什么公平可言 你字斟句酌的短信也只值得他一眼扫过 他的一句晚安就能让你辗转反侧 你再多的热情也给不了他想...
    祖宗祖宗1阅读 381评论 0 0
  • 一、图形上下文 一个CGContextRef类型的属性,用于: 保存绘图信息、绘图状态 决定绘制的输出目标 类型:...
    Coulson_Wang阅读 231评论 0 0