【k8s系列2】spark on k8s(kubernetes) DynamicResourceAllocation(DRA)

 随着大数据时代的到来,以及kubernetes的愈发火热,好多公司已经把spark应用从yarn迁移到k8s,当然也踩了不少的坑,    
 现在我们来分析一下spark on k8s的DynamicResourceAllocation这个坑

注意:该文基于spark 3.0.0分析

spark on yarn 中的DynamicResourceAllocation

spark on yarn对于DynamicResourceAllocation分配来说,从spark 1.2版本就已经开始支持了.
对于spark熟悉的人都知道,如果我们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,
对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:

<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>spark_shuffle</value>
</property>

<property>
   <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
   <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>

<property>
   <name>spark.shuffle.service.port</name>
   <value>7337</value>
</property>

重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark应用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的效果

我们直接从CoarseGrainedExecutorBackend中SparkEnv创建开始说,每一个executor的启动,必然会经过CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的创建

 val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
       arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

而sparkEnv的创建就涉及到BlockManager的创建。沿着代码往下走,最终

val blockTransferService =
     new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
       blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint)
val blockManager = new BlockManager(
     executorId,
     rpcEnv,
     blockManagerMaster,
     serializerManager,
     conf,
     memoryManager,
     mapOutputTracker,
     shuffleManager,
     blockTransferService,
     securityManager,
     externalShuffleClient)

在blockManager的initialize方法中,就会进行registerWithExternalShuffleServer

 // Register Executors' configuration with the local shuffle service, if one should exist.
   if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
     registerWithExternalShuffleServer()
   }

如果我们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把当前的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:

val shuffleConfig = new ExecutorShuffleInfo(
     diskBlockManager.localDirsString,
     diskBlockManager.subDirsPerLocalDir,
     shuffleManager.getClass.getName)

这里我重点分析一下registerWithExternalShuffleServer的方法中的以下片段

// Synchronous and will throw an exception if we cannot connect.
       blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
         shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)            

该代码中shuffleServerId来自于:

shuffleServerId = if (externalShuffleServiceEnabled) {
     logInfo(s"external shuffle service port = $externalShuffleServicePort")
     BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
   } else {
     blockManagerId
   }

而blockTransferService.hostName 是我们在SparkEnv中创建的时候由advertiseAddress传过来的,
最终由CoarseGrainedExecutorBackend 主类参数hostname过来的,那到底怎么传过来的呢?
参照ExecutorRunnable的prepareCommand方法,

val commands = prefixEnv ++
     Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
     javaOpts ++
     Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
       "--driver-url", masterAddress,
       "--executor-id", executorId,
       "--hostname", hostname,
       "--cores", executorCores.toString,
       "--app-id", appId,
       "--resourceProfileId", resourceProfileId.toString) ++

而这个hostname的值最终由YarnAllocator的方法runAllocatedContainers

val executorHostname = container.getNodeId.getHost

传递过来的,也就是说我们最终获取到了yarn节点,也就是nodeManager的host
这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动态资源分配的
ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动态资源分配的NettyBlockTransferService大同小异了

spark on k8s(kubernetes) 中的DynamicResourceAllocation

参考之前的文章,我们知道在entrypoint中我们在启动executor的时候,我们传递了hostname参数

executor)
    shift 1
    CMD=(
      ${JAVA_HOME}/bin/java
      "${SPARK_EXECUTOR_JAVA_OPTS[@]}"
      -Xms$SPARK_EXECUTOR_MEMORY
      -Xmx$SPARK_EXECUTOR_MEMORY
      -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
      org.apache.spark.executor.CoarseGrainedExecutorBackend
      --driver-url $SPARK_DRIVER_URL
      --executor-id $SPARK_EXECUTOR_ID
      --cores $SPARK_EXECUTOR_CORES
      --app-id $SPARK_APPLICATION_ID
      --hostname $SPARK_EXECUTOR_POD_IP
    )

而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:

Seq(new EnvVarBuilder()
          .withName(ENV_EXECUTOR_POD_IP)
          .withValueFrom(new EnvVarSourceBuilder()
            .withNewFieldRef("v1", "status.podIP")
            .build())
          .build())

这样按照以上流程的分析,即使我们在每个k8s节点开启ExternalShuffleService服务, 且pod挂载了持久化盘,
executor也不能向k8s节点ExternalShuffleService服务注册,因为我们注册的节点是POD IP,而不是节点IP,
当然spark社区早就提出了未开启external shuffle service的动态资源分配,且已经合并到master分支.
具体配置,可以参照如下:

spark.dynamicAllocation.enabled  true 
spark.dynamicAllocation.shuffleTracking.enabled  true
spark.dynamicAllocation.minExecutors  1
spark.dynamicAllocation.maxExecutors  4
spark.dynamicAllocation.executorIdleTimeout  60s
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355