SparkStreaming优雅关闭剖析

简介

在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断执行的,除非出现异常退出。当然SparkStreaming提供了checkpoint和WAL机制能够保证我们的程序再次启动时候不会出现数据丢失的情况。但是需求并不是一成不变的,相信读者们都经历过需求不断迭代的情况,当我们需要迭代逻辑的时候,那么我们如何停止线上正在运行的程序呢?本文将为读者们详细介绍一些关于SparkStreaming优雅关闭的手段。接下来我们将针对以下几个问题进行展开讲解:

  1. 为什么需要优雅关闭?
  2. 什么时候触发关闭?
  3. 采用什么策略关闭?

1.为什么需要优雅关闭

基于前面提到的,当我们的场景需要保证数据准确,不允许数据丢失,那么这个时候我们就得考虑优雅关闭了。说到关闭,那么非优雅关闭就是通过kill -9 processId的方式或者yarn -kill applicationId的方式进行暴力关闭,为什么说这种方式是属于暴力关闭呢?由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会导致这段时间的数据丢失,虽然提供了checkpoin机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉checkpoint,因此这里就会有丢失的风险。

因此我们需要优雅关闭,将剩余未处理的数据或者正在处理的数据能够全部执行完成后,这样才不会出现数据丢失的情况。

2.什么时候触发关闭

既然我们知道了需要优雅关闭,那么就需要知道什么会触发关闭,这样才能有针对性的策略实现优雅关闭。

首先我们先来了解一下整体流程:

  1. 首先StreamContext在做初始化的时候,会增加Shutdown hook方法 ,放入到一个钩子队列中,并设置优先级为51
  2. 当程序jvm退出时,会启动一个线程从钩子队列中按照优先级取出执行,然后就会执行Shutdown钩子方法
  3. 当执行Shutdown钩子方法时,首先会将receiver进行关闭,即不再接收数据
  4. 然后停止生成BatchRDD
  5. 等待task全部完成,停止Executor
  6. 最后释放所有资源,即整个关闭流程结束

接下来看源码的具体实现

StreamingContext.scala:调用start方法会调用ShutdownHookManager注册stopOnShutdown函数

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        ......
        /**
         * StreamContext启动时会增加Shutdown钩子函数,优先级为51
         */
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
       ....
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

ShutdownHookManager.scala:在增加钩子函数的时候底层调用了SparkShutdownHookManager内部类

def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
    shutdownHooks.add(priority, hook)
} 
private lazy val shutdownHooks = {
    val manager = new SparkShutdownHookManager()
    manager.install()
    manager
  }

private [util] class SparkShutdownHookManager {
  def install(): Unit = {
    val hookTask = new Runnable() {
      override def run(): Unit = runAll()
    }
    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
  }

  /**
   * jvm退出的时候会开启一个线程按照优先级逐个调用钩子函数
   */
  def runAll(): Unit = {
    shuttingDown = true
    var nextHook: SparkShutdownHook = null
    while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
      Try(Utils.logUncaughtExceptions(nextHook.run()))
    }
  }

  def add(priority: Int, hook: () => Unit): AnyRef = {
    hooks.synchronized {
      if (shuttingDown) {
        throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
      }
      val hookRef = new SparkShutdownHook(priority, hook)
      hooks.add(hookRef)
      hookRef
    }
  }
}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
  extends Comparable[SparkShutdownHook] {
  //这里真正调用注册的函数
  def run(): Unit = hook()
}

那么接下来看下真正执行关闭的逻辑,即StreamingContext#stopOnShutdown方法

 private def stopOnShutdown(): Unit = {
    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
    stop(stopSparkContext = false, stopGracefully = stopGracefully)
  }
 def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
    synchronized {
      state match {
        case ACTIVE =>
          //调度相关的关闭
          Utils.tryLogNonFatalError {
            scheduler.stop(stopGracefully)
          }
         
          //监控
          Utils.tryLogNonFatalError {
            env.metricsSystem.removeSource(streamingSource)
          }
          
          //ui
          Utils.tryLogNonFatalError {
            uiTab.foreach(_.detach())
          }
          Utils.tryLogNonFatalError {
            unregisterProgressListener()
          }
          StreamingContext.setActiveContext(null)
          //设置状态为停止
          state = STOPPED
      }
    }
    if (shutdownHookRefToRemove != null) {
      ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
    }
     // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
    if (stopSparkContext) sc.stop()
  }

可以看到这里有一个spark.streaming.stopGracefullyOnShutdown参数来传给底层的stop方法,即调用Jobscheduler#stop方法

JobScheduler#stop

 def stop(processAllReceivedData: Boolean): Unit = synchronized {
    //1.首先停止接收数据
    if (receiverTracker != null) {
      receiverTracker.stop(processAllReceivedData)
    }

    if (executorAllocationManager != null) {
      executorAllocationManager.foreach(_.stop())
    }

    //2.停止生成BatchRdd,处理剩余的数据
    jobGenerator.stop(processAllReceivedData)

    //3.停止Exectuor
    jobExecutor.shutdown()

    val terminated = if (processAllReceivedData) {
      jobExecutor.awaitTermination(1, TimeUnit.HOURS)  // just a very large period of time
    } else {
      jobExecutor.awaitTermination(2, TimeUnit.SECONDS)
    }
    if (!terminated) {
      jobExecutor.shutdownNow()
    }
  
    // Stop everything else
    listenerBus.stop()
    eventLoop.stop()
    eventLoop = null
    logInfo("Stopped JobScheduler")
  }

3.采用什么策略关闭?

3.1 配置策略

根据刚才梳理的触发关闭流程中,其实可以通过配置spark.streaming.stopGracefullyOnShutdown=true来实现优雅关闭,但是需要发送 SIGTERM 信号给driver端,这里有两种方案

方案一,具体步骤如下:

  1. 通过Spark UI找到driver所在节点。

  2. 登录driver节点,执行 ps -ef |grep java |grep ApplicationMaster命令找到对应的pid

  3. 执行**kill -SIGTERM ** 发送SIGTERM信号

  4. 当spark driver收到该信号时,在日志中会有以下信息

    ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
    INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
    INFO streaming.StreamingContext: StreamingContext stopped successfully
    INFO spark.SparkContext: Invoking stop() from shutdown hook
    INFO spark.SparkContext: Successfully stopped SparkContext
    INFO util.ShutdownHookManager: Shutdown hook called
    

    注意:

    这里有一个坑,默认情况下在yarn模式下,spark.yarn.maxAppAttempts参数值和yarn.resourcemanager.am.max-attempts是同一个值,即为2。当通过Kill命令杀掉AM时,Yarn会自动重新启动一个AM,因此需要再发送一次Kill命令。当然也可以通过spark-submit命令提交的时候指定spark.yarn.maxAppAttempts=1这个配置参数;但这里也会有容灾风险,比如出现网络问题的时候,这里就无法自动重启了,程序就会以失败而告终。

方案二:通过yarn application -kill < applicationid >命令来kill掉job(不建议使用)

该命令会发送SIGTERM信号给container,同时也会立即发送 SIGKILL 命令。虽然可以通过yarn.nodemanager.sleep-delay-before-sigkill.ms参数来调整SIGTERM和SIGKILL之间的间隔,但是好像没什么作用。具体日志信息如下:

ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

3.2 标记策略

该种策略通过借助于三方系统来标记状态, 一种方法是将标记HDFS文件,如果标记文件存在,则调用scc.stop(true,true);或者是借助于redis的key是否存在等方式

val checkIntervalMillis = 60000
var isStopped = false

while (! isStopped) {
    isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
    checkShutdownMarker
    if (!isStopped && stopFlag) {
        ssc.stop(true, true)
    }
}

def checkShutdownMarker = {
    if (!stopFlag) {
        val fs = FileSystem.get(new Configuration())
        stopFlag = fs.exists(new Path(shutdownMarker))
    }

3.3 服务策略

即提供一个restful服务,暴露出一个接口提供关闭功能。

def httpServer(port:Int,ssc:StreamingContext)={
    val server = new Server(port)
    val context = new ContextHandler()
    context.setContextPath("/shutdown")
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
}
class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      ssc.stop(true,true)
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      baseRequest.setHandled(true);
    }
  }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352