提高spark任务稳定性1 - Blacklist 机制

场景

一个 spark 应用的产生过程: 获取需求 -> 编写spark代码 -> 测试通过 -> 扔上平台调度。

往往应用会正常运行一段时间,突然有一天运行失败,或是失败了一次才运行成功。

从开发者的角度看,我的代码没问题,测试也通过了,之前一段都运行好好的,怎么突然就失败了呢?为什么我重新调度又能正常运行了,是不是你们平台不稳定?

是什么导致了上述问题?

分布式集群中,特别是高负载的情况下,就会引发很多意想不到的问题,例如:

  1. 坏盘/硬盘满将会 导致 /path/to/usercache 目录创建失败,一个stage中任务失败次数达到一定次数(spark.task.maxFailures)会导致整个job失败。
  2. executor 注册 external shuffle service 超时。
  3. executor 从 external shuffle service 获取数据超时,task 反复失败后导致了整 个stage 的失败。
  4. 环境依赖问题,例如 xxx 包不存在, xxx 包没有安装。
  5. dns 没有配置,网络不通。
  6. etc.

为什么 task 失败后还会被 schedular 重新调度在原来的 node 或是 executor上?

数据本地性(spark会优先把task调度在有相应数据的节点上)导致。

是否只能听天由命,每次失败后重新调度? 如果任务有SLA的限制怎么办?

介绍

spark 2.1 中增加了 blacklist 机制,当前(2.3.0)还是试验性质的功能,黑名单机制允许你设置 task 在 executor / node 上失败次数的阈值, 从而避免了一路走到黑的情况出现。 :)

相关参数

配置 默认值 描述
spark.blacklist.enabled false 是否开启黑名单机制
spark.blacklist.timeout 1h 对于被加入 application 黑名单的 executor/节点 ,多长时间后无条件的移出黑名单以运行新任务
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 对于同一个 task 在某个 executor 中的失败重试阈值。达到阈值后,在执行这个 task 时,该 executor 将被加入黑名单
spark.blacklist.task.maxTaskAttemptsPerNode 2 对于同一个 task 在某个节点上的失败重试阈值。达到阈值后,在执行这个 task 时,该节点将被加入黑名单
spark.blacklist.stage.maxFailedTasksPerExecutor 2 一个 stage 中,不同的 task 在同一个 executor 的失败阈值。达到阈值后,在执行这个 stage 时该 executor 将会被加入黑名单
spark.blacklist.stage.maxFailedExecutorsPerNode 2 一个 stage 中,不同的 executor 加入黑名单的阈值。达到阈值后,在执行这个 stage 时该节点将会被加入黑名单
spark.blacklist.application.maxFailedTasksPerExecutor 2 在同一个 executor 中,不同的 task的失败阈值 。达到阈值后,在整个 appliction 运行期间,该 executor 都会被加入黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation,这些 executor 可能会由于空闲时间过长被回收。
spark.blacklist.application.maxFailedExecutorsPerNode 2 在一个节点中,不同 executor 加入 application 黑名单的阈值。达到这个阈值后,该节点会进入 application 黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation,该节点上的 executor 可能会由于空闲时间过长被回收。
spark.blacklist.killBlacklistedExecutors false 如果开启该配置,spark 会自动关闭并重启加入黑名单的 executor,如果整个节点都加入了黑名单,则该节点上的所有 executor 都会被关闭。
spark.blacklist.application.fetchFailure.enabled false 如果开启该配置,当发生 fetch failure时,立即将该 executor 加入到黑名单。要是开启了 external shuffle service,整个节点都会被加入黑名单。

实现细节

因为是实验性质的功能,所以代码可能会随时变动。

只贴出部分核心代码。

TaskSetBlacklist

黑名单账本:

//k:executor v:该executor上每个 task 的失败情况(task失败的次数和最近一次失败时间)
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()

//k:节点,v:该节点上有失败任务的 executor
private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
//k:节点, v:该节点上加入黑名单的 taskId
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
  
//加入黑名单的 executor 
private val blacklistedExecs = new HashSet[String]()
//加入黑名单的 node
private val blacklistedNodes = new HashSet[String]()
// 判断 executor 是否加入了给定 task 的黑名单
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
    execToFailures.get(executorId).exists { execFailures =>
      execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
    }
}

//判断 node 是否加入了给定 task 的黑名单
def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
    nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
}

当有task失败时,TaskSetManager 会调用更新黑名单的操作:

  1. 根据 taskid 更新 excutor 上该 task 的失败次数和失败时间
  2. 判断 task 是否在该节点其他 executor 上有失败记录,如果有,将重试次数相加,如果 >= MAX_TASK_ATTEMPTS_PER_NODE ,则将该 node 加入这个 taskId 的黑名单
  3. 判断在这个stage中,一个executor中失败的任务次数是否 >= MAX_FAILURES_PER_EXEC_STAGE,如果是,则将该 executor 加入这个 stageId 的黑名单
  4. 判断在这个stage中,同一个 node 的 executor 的失败记录是否 >= MAX_FAILED_EXEC_PER_NODE_STAGE,如果是,则将该 node 加入这个 stageId 的黑名单

阈值参数:

  • MAX_TASK_ATTEMPTS_PER_EXECUTOR:每个 executor 上最大的任务重试次数
  • MAX_TASK_ATTEMPTS_PER_NODE:每个 node 上最大的任务重试次数
  • MAX_FAILURES_PER_EXEC_STAGE:一个 stage 中,每个executor 上最多任务失败次数
  • MAX_FAILED_EXEC_PER_NODE_STAGE:一个 stage 中,每个节点上 executor 的最多失败次数
  private[scheduler] def updateBlacklistForFailedTask(
      host: String,
      exec: String,
      index: Int,
      failureReason: String): Unit = {
    latestFailureReason = failureReason
    val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
    execFailures.updateWithFailure(index, clock.getTimeMillis())

    val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
    execsWithFailuresOnNode += exec
    val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
      execToFailures.get(exec).map { failures =>
        failures.getNumTaskFailures(index)
      }
    }.sum
    if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
      nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
    }

    if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
      if (blacklistedExecs.add(exec)) {
        logInfo(s"Blacklisting executor ${exec} for stage $stageId")
        val blacklistedExecutorsOnNode =
          execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
        if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
          if (blacklistedNodes.add(host)) {
            logInfo(s"Blacklisting ${host} for stage $stageId")
          }
        }
      }
    }
  }

BlacklistTracker

实现原理和TaskSetBlacklist,下文就不再贴出黑名单判断,黑名单对象等代码。

TaskSetBlacklist 不同的是,在一个 taskSet 完全成功之前,BlacklistTracker 无法获取到任务失败的情况。

核心代码:

当一个 taskSet 执行成功时会调用以下代码,流程如下:

  1. 将每个 executor 上的 task 失败次数进行累计,如果 executor 最后一次 task 失败的时间超过 BLACKLIST_TIMEOUT_MILLIS,则移除该失败任务。
  2. 如果 executor 上失败次数大于等于设定的阈值并且不在黑名单中
    • executor 及其对应的到期时间加入到 application 的黑名单中,从executor失败列表中移除该 executor,并更新 nextExpiryTime,用于下次启动任务的时候判断黑名单是否已到期
    • 根据 spark.blacklist.killBlacklistedExecutors 判断是否要杀死 executor
    • 更新 node 上的 executor 失败次数
    • 如果一个节点上的 executor 的失败次数大于等于阈值并且不在黑名单中
      • node 及其对应的到期时间加入到 application 的黑名单中
      • 如果开启了 spark.blacklist.killBlacklistedExecutors,则将此 node 上的所有 executor 杀死
  • BLACKLIST_TIMEOUT_MILLIS:加入黑名单后的过期时间
  • MAX_FAILURES_PER_EXEC:每个executor上最多的task失败次数
  • MAX_FAILED_EXEC_PER_NODE: 每个节点上加入黑名单的executor的最大数量
def updateBlacklistForSuccessfulTaskSet(
      stageId: Int,
      stageAttemptId: Int,
      failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
    val now = clock.getTimeMillis()
    failuresByExec.foreach { case (exec, failuresInTaskSet) =>
      val appFailuresOnExecutor =
        executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
      appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
      appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
      val newTotal = appFailuresOnExecutor.numUniqueTaskFailures

      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
      if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
        logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
          s" task failures in successful task sets")
        val node = failuresInTaskSet.node
        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
        listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
        executorIdToFailureList.remove(exec)
        updateNextExpiryTime()
        killBlacklistedExecutor(exec)

        val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
        blacklistedExecsOnNode += exec
        if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
            !nodeIdToBlacklistExpiryTime.contains(node)) {
          logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
            s"executors blacklisted: ${blacklistedExecsOnNode}")
          nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
          listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
          killExecutorsOnBlacklistedNode(node)
        }
      }
    }
  }

什么时候进行黑名单的判断

一个 stage 提交的调用链:

TaskSchedulerImpl.submitTasks ->
CoarseGrainedSchedulerBackend.reviveOffers ->
CoarseGrainedSchedulerBackend.makeOffers ->
TaskSchedulerImpl.resourceOffers ->
TaskSchedulerImpl.resourceOfferSingleTaskSet ->
CoarseGrainedSchedulerBackend.launchTasks

appliaction 级别的黑名单在 TaskSchedulerImpl.resourceOffers 中完成判断,stage/task 级别的黑名单在 TaskSchedulerImpl.resourceOfferSingleTaskSet 中完成判断。

如果所有的节点都被加入了黑名单?

如果将task的重试次数设置的比较高,有可能会出现这个问题,这个时候。将会中断这个 stage 的执行

TaskSchedulerImpl.resourceOffers

if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}

结语

简单的来说,对于一个 application ,提供了三种级别的黑名单可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist

通过这些黑名单的设置可以避免由于 task 反复调度在有问题的 executor/node (坏盘,磁盘满了,shuffle fetch 失败,环境错误等)上,进而导致整个 Application 运行失败的情况。

tip: BlacklistTracker.updateBlacklistForFetchFailure 在当前版本(2.3.0)存在BUG SPARK-24021,将在 2.3.1 进行修复。如果打开了 spark.blacklist.application.fetchFailure.enabled 配置将会受到影响。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,052评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,267评论 3 397
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,518评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,457评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,474评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,143评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,728评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,650评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,184评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,269评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,415评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,083评论 5 348
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,775评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,257评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,389评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,777评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,419评论 2 359