记一次Spark Yarn Shuffle Service升级引发的血案

前言

Spark YarnShuffleService是作为Hadoop Yarn模块中NodeManager的辅助服务寄生在其进程内部,大家都知道可以通过这个外部服务来削减Executor自身在shuffle过程中的压力,且得益于这个服务的常驻特性,Shuffle Write的文件可以跟着这个服务走,就可以实现动态资源分配等Spark的高级特性。

当然,这个过程中我们把压力实则转移到了NodeManger上,NodeManager的如果有过重的GC问题,在响应Shuffle Client的时候就会有问题,很多Shuffle Client的频繁重试乃至FetchFailed相关的异常都基本和这个方面有关。另外,也可能给NodeManager带来OOM的风险,比如 YARN-7110

不得不说,Spark YarnShuffleService也是一个相对稳定的模块,高版本兼容低版本,低版本兼容高版本很多时候都没什么问题,因为这个模块几乎没有改动。此外由于这个服务的Jar包是需要放在NodeManager的ClassPath中,推动Hadoop的升级相对繁琐与困难,在兼容性测试通过的基础上,我们也不乐于去做这个推动者,以至于我们生产集群早起部署的Spark 2.1.2 based的分支光荣的承载的Spark 1.6.x~Spark 2.3.x。

凡事都有契机,

For users who enabled external shuffle service, this feature can only be worked when external shuffle service is newer than Spark 2.2.

这种情况就不得不升级了。

趁着夜黑风高,Hadoop Team上Capacity Scheduler/CGroup/NodeLabel/开超售,上2.9等等等等,我们这有个Spark 2.3.2的jar包,兄弟们拿去用吧。

问题来了(现象)

当然,这么大规模的升级动作,必然不能一蹴而就,经历的N次回滚之后,终于算是“尘埃落定”。当用户大爷们开始进场了,那么问题就来了。

1. NodeManager进程吃CPU

48核的机器,每个核60-80%的使用率,都给NodeManager进程吃了,还能不能好好跑Container了?
赶紧把NodeManager的jstack一打


吃cpu的线程

哇哦,IO类的操作这么吃CPU,什么情况?

撸下代码

  /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores. */
  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }

原来是Shuffle Server服务端用了96个线程

/**
   * Specifies an upper bound on the number of Netty threads that Spark requires by default.
   * In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
   * that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
   * at a premium.
   *
   * Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
   * allocation. It can be overridden by setting the number of serverThreads and clientThreads
   * manually in Spark's configuration.
   */
  private val MAX_DEFAULT_NETTY_THREADS = 8

根据官方描述2-4个线程基本就可以满足万兆网的数据传输服务了,保险起见那我们就把这个参数设置成8吧。

spark.shuffle.io.serverThreads=8

果然NodeManager的CPU消耗降下来了,CPU被限制在了600%-800%,但这几个CPU相当于还是爆满的。

2. ExecutorLostFailure

Spark启动Executor之后,首先由Executor端的CoarseGrainedExecutorBackend发送RegisterExecutor消息到Driver端的CoarseGrainedSchedulerBackend

override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

CoarseGrainedSchedulerBackend收到这个消息之后, 塞进executorDataMap完成executor的注册,然后发送一个异步消息给CoarseGrainedExecutorBackend(就不管了),就就就就开始makeOffers, 下发task到这个executor了。

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
        //此处删掉一些代码
          val data = new ExecutorData(executorRef, executorAddress, hostname,
            cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          executorRef.send(RegisteredExecutor)
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
        }

CoarseGrainedExecutorBackend 接受到RegisteredExecutor消息后开始处理

case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

这个过程其实才算Executor本尊的初始化过程,而且这个过程其实相对来说是比较“耗时”“容易出问题”的,一个是要完成block manager的初始化,其中需要完成Shuffle Client的初始化,如果是开启了External Shuffle Service服务,那就需要 registerWithExternalShuffleServer,实例化并注册Shuffle Client到Shuffle Server端(也就是NodeManager 7337端口),
这个分为两个过程,一个是创建Shuffle Client的实例,另一个是Shuffle Client基于spark.shuffle.registration.timeout 5000的超时间隔发一个RegisterExecutor的同步消息给Shuffle Server,然后基于spark.shuffle.registration.maxAttempts 3进行这个周期的重试,当这两个参数设置过大,或者Shuffle Client实例化的时间过长,就会阻塞整个Executor本尊的初始化过程,这个过程完成后才能启动Executor端的心跳上报机制,当上个过程不完成,心跳没上报,Driver端就无法更新Executor的最新状态。这个时候Task已然可能已经分发到这个Executor上,然后就会出现以下这类ExecutorLostFailure,往往Executor查看日志端也并没有task被分配到这个Executor上。

ExecutorLostFailure

往往Executor查看日志端也并没有task被分配到这个Executor上。基本上刷完下面的日志不成功连接,就会结果了这个Executor

2019-08-02 02:23:36,064 [129209] - ERROR [dispatcher-event-loop-0:Logging$class@91] - Failed to connect to external shuffle server, will retry 14 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to hadoop3909.jd.163.org/10.196.68.54:7337
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
    at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:201)
    at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:142)
    at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:289)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:286)
    at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:260)
    at org.apache.spark.executor.Executor.<init>(Executor.scala:116)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这块总体上可以算是Spark本身的一个缺陷,在executor未真正初始化之前不应该将它算成一个可用的计算节点,不应该将任务下发到这个节点,明显的这边task处于无效的等待中,最后无谓的失败掉!

另外当开始Dynamic Executor Allocation特性和BlackList特性时,如果minExecutors值设置较小,如1,如果任务被分配到这个节点上,就可能出现cannot run anywhere due to node and executor blacklist样子的错误,导致整个Job 被杀掉,这问题就更严重了。两个增强Spark自身鲁棒性的功能,配合另一个反而更加容易让其挂掉,呵呵。

大量的FetchFailed

一种情况是,服务端主动或被动关闭连接,客户端收到RST信号

org.apache.spark.shuffle.FetchFailedException: Connection reset by peer

这种一般是由于服务端压力过大造成的,对于这类异常一般都是通过调整以下两个参数来调整客户端的重试次数来给客户端自己更多的机会,加大等待间隔来给服务端更多的喘息机会来消化自身的压力。

spark.shuffle.io.maxRetries 15
spark.shuffle.io.retryWait 6s

但是服务端的压力实在过大就需要从服务端入手了,盲目的调整这两个参数只会让你的任务更慢的失败而已。

另一种情况是,客户端判断有请求但通道中没有流量,这个时候超过网络超时的设置就会断开连接,

2019-08-02 01:31:57,119 [673532] - INFO  [Executor task launch worker for task 5038:Logging$class@54] - Code generated in 32.595886 ms
2019-08-02 01:46:55,672 [1572085] - ERROR [shuffle-client-6-4:TransportChannelHandler@144] - Connection to hadoop3816.jd.163.org/10.196.77.46:7337 has been quiet for 900000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
2019-08-02 01:46:55,674 [1572087] - ERROR [shuffle-client-6-4:TransportResponseHandler@144] - Still have 4 requests outstanding when connection from hadoop3816.jd.163.org/10.196.77.46:7337 is closed
2019-08-02 01:46:55,675 [1572088] - ERROR [shuffle-client-6-4:OneForOneBlockFetcher$1@138] - Failed while starting block fetches
java.io.IOException: Connection from hadoop3816.jd.163.org/10.196.77.46:7337 closed

此时同一Executor上的别的task复用这个客户端时就会出现 connection * closed异常,注意日志第一行和第二行的时间间隔,这时候由于spark.network.timeout的设置,这个没有流量的通道其实闲置了近15分钟的时间,我们可以通过设置spark.shuffle.io.connectionTimeout=xxs来单独控制这段逻辑的超时时间,而不是用spark.network.timeout的统一设置。

什么原因(本质)

Shuffle 过程本质上是一个IO密集型的操作,但CPU消耗过大,其实才是真正的问题所在,再加上用户作业在整个过程中并不是一个变量,变量只有Hadoop相关的修改,及Spark Yarn ShuffleService的2.1-> 2.3的变化,抛开Hadoop先不说,先看下Spark自身的变化。
SPARK-15074,由于发现做一个大任务的时候花了大把的时间在shuffle fetch过程中,jstack信息显示主要的时间是花在了反复地读取index文件上,这和我们的场景很像,这个issue对应的PR中以entry的方式缓存index文件,而我们用的Spark 2.3通过SPARK-21501之后已经改成了以文件大小的方式缓存,默认的话只有100m的大小,对于集群上大部分shuffle作业,这点基本上不够看,等于没有缓存,所以大量的开销都花在了index文件的读取上。
通过调整spark.shuffle.service.index.cache.size=6144m,大大缩减这块的开销,减少了NodeManager CPU压力,缓解了Shuffle Client注册,连接、传输超时和断连的问题。

总结

  1. 不建议通过调整spark.network.timeout来调整所有spark网络超时相关的参数,影响面太广,不可控,建议通过不同的timeout参数控制不同的逻辑
  2. Spark 大超时参数可能影响Executor注册及Task调度的初始逻辑,这里有坑,详细见上面
  3. 要注意Spark Yarn Shuffle Service在Index文件缓存方式上的变化,2.1的时候通过entry个数来限制总量,当单entry过大时容易造成NodeManager的内存压力和OOM风险,2.3的 spark.shuffle.service.index.cache.size 100M太小,需要调整。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容