spark streaming checkpoint详解

spark streaming提供了两种数据的checkpoint:

  • metadata checkpoint用以恢复spark streaming 的运行状态,存储媒介是org.apache.spark.streaming.Checkpoint,其中记录了org.apache.spark.streaming.StreamingContext的主要内容,包括:
    • val master = ssc.sc.master
    • val framework = ssc.sc.appName
    • val jars = ssc.sc.jars
    • val graph = ssc.graph //DStreamGraph
    • val checkpointDir = ssc.checkpointDir
    • val checkpointDuration = ssc.checkpointDuration
    • val pendingTimes = ssc.scheduler.getPendingTimes().toArray
    • val sparkConfPairs = ssc.conf.getAll
  • DStream data checkpoint 存储了运行时生成的rdd的数据内容

metadata checkpoint

使用checkpoint有两种方法:

  • 1.显示调用checkpoint方法
val ssc: StreamingContext=null
ssc.checkpoint(checkPointPath)
  • 2.创建StreamingContext 的选择从checkponit恢复
val ssc = StreamingContext.getOrCreate(checkpointPath, () => createContext())

两种方式都可以使checkpoint生效,区别就是是否从checkpoint恢复,那么如果不需要从checkponit恢复数据,什么情况下还要进行checkpoint呢?

checkpoint可以切断rdd的依赖

比如我们使用了一个stateDstream,存储了部分状态数据在spark内存中,当前批次的RDD的生成是依赖于前一个批次的RDD,当流作业运行很长时间时这个依赖链将无穷大,那么spark需要保存所有运行过的RDD在内存中,因此必须取消RDD的依赖,而checkpoint就有这个作用;
Rdd类提供了一个方法:org.apache.spark.rdd.RDD#computeOrReadCheckpoint,即对于已经checkpoint的rdd,可以不再经过计算就可以得到结果;

通过metadata checkpoint恢复流作业

先介绍一下几个类的关系:

  • org.apache.spark.streaming.DStreamGraph 存储了流作业的stream关系
  • org.apache.spark.streaming.scheduler.JobScheduler 负责Job的运行
  • org.apache.spark.streaming.scheduler.JobGenerator 根据DStreamGraph和时间生成Job作业,并提交给JobScheduler

DStreamGraph恢复

如果从checkponit恢复,那么StreamingContext可以要求graph对于rdd数据进行恢复:

 private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }

JobScheduler和JobGenerator的恢复

JobScheduler是流作业启动后重新创建的,而JobGenerator是再JobScheduler中创建出来的,他们的恢复是通过DstreamGraph来重新构建的:

 /** Restarts the generator based on the information in checkpoint */
  private def restart() {
    // If manual clock is being used for testing, then
    // either set the manual clock to the last checkpointed time,
    // or if the property is defined set it to that time
    if (clock.isInstanceOf[ManualClock]) {
      val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
      val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
      clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
    }

    val batchDuration = ssc.graph.batchDuration

    // Batches when the master was down, that is,
    // between the checkpoint and current restart time
    val checkpointTime = ssc.initialCheckpoint.checkpointTime
    val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
    val downTimes = checkpointTime.until(restartTime, batchDuration)
    logInfo("Batches during down time (" + downTimes.size + " batches): "
      + downTimes.mkString(", "))

    // Batches that were unprocessed before failure
    val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
    logInfo("Batches pending processing (" + pendingTimes.length + " batches): " +
      pendingTimes.mkString(", "))
    // Reschedule jobs for these times
    val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
      .distinct.sorted(Time.ordering)
    logInfo("Batches to reschedule (" + timesToReschedule.length + " batches): " +
      timesToReschedule.mkString(", "))
    timesToReschedule.foreach { time =>
      // Allocate the related blocks when recovering from failure, because some blocks that were
      // added but not allocated, are dangling in the queue after recovering, we have to allocate
      // those blocks to the next batch, which is the batch they were supposed to go.
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
    }

    // Restart the timer
    timer.start(restartTime.milliseconds)
    logInfo("Restarted JobGenerator at " + restartTime)
  }

我们可以看到,恢复的batch包含pendingTimes 和downTimes,downTimes是checkpoint time到程序的启动时间范围内的batch,那么为什么还要考虑pendingTimes呢?
因为在两种情况下会进行checkpoint:

  • 1.batch生成,即对应的batch时间到达时,生成job后
  • 2.batch完成时,即当batch结束时

第一种情况,会把最新批次的batchTime作为checkpoint的时间戳,这种情况下可能之前的batch执行时间大于interval time,那么会导致记录的时候上一个batch并未执行结束,即属于pendingTimes,那么如果读到了这种checkpoint文件,那么需要恢复pendingTimes的记录;

第二种情况下,spark使用了最新的时间戳作为checkpoint文件名称,pendingTimes和downTimes的内容是可能重复的,因此选择了distinct操作

DStream checkpoint

DStream checkpoint的使用分为两种情况:

  • 1.默认的情况下,输入数据源的checkpoint,如DirectKafkaInputDStream,FileInputDStream,自己实现了checkpointData,当开启metadata的checkpoint后,自动生效
  • 2.当使用stateDStream时,需要对DStream显式调用checkpoint(interval: Duration)方法

因此stateDStream的checkpoint周期和metadata的checkpoint的周期可能是不一致的;

普通DStream的checkpoint的周期和metadata的checkpoint的周期是一致的。

DStream checkpoint恢复数据

DStreamGraph恢复过程中完成Dstream的恢复,_cp.graph.restoreCheckpointData()
实现如下:

 def restoreCheckpointData() {
    logInfo("Restoring checkpoint data")
    this.synchronized {
      outputStreams.foreach(_.restoreCheckpointData())
    }
    logInfo("Restored checkpoint data")
  }
  
  private[streaming] def restoreCheckpointData() {
    if (!restoredFromCheckpointData) {
      // Create RDDs from the checkpoint data
      logInfo("Restoring checkpoint data")
      checkpointData.restore()
      dependencies.foreach(_.restoreCheckpointData())
      restoredFromCheckpointData = true
      logInfo("Restored checkpoint data")
    }
  }

如上所示,DirectKafkaInputDStream和FileInputDStream自己实现了checkpointData,因此可以自动恢复checkpoint数据;
而stateStream等需要显式的调用checkpoint,使checkpoint生效

  def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
      throw new UnsupportedOperationException(
        "Cannot change checkpoint interval of a DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
  }

我们注意到,这里调用了persist()方法,因此开启checkpoint后,当前的DStream中生成的RDD会自动调用persist()方法,使数据缓存在内存中,等清除的时候再删除;

在Dstream checkpoint过程调用persist的原因

stateStream每个batch的计算是需要依赖之前的上个批次的RDD,因此,当开启checkpoint时默认后边的计算是需要依赖于前一个批次的计算结果,进行persist后可以快速的进行计算;这也是为什么当开启checkpoint后,在spark的storage页面上多出来一些被chache的RDD

Clear Dstream checkpoint

我们知道当开启Dstream checkpoint时,每次进行checkpoint时,都需要记录大量的数据到磁盘,同时,会在内存中缓存之前批次的数据;这个数据的必须进行清理,所以什么时候清理,怎么清理checkpoint数据很重要

 private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist()
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
    logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
      s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
    dependencies.foreach(_.clearMetadata(time))
  }

我们可以看到,当一个batch结束的时候,会clear相关的metadata,主要clear过期的rdd,而是否过期依赖于参数rememberDuration;

 /**
   * Initialize the DStream by setting the "zero" time, based on which
   * the validity of future times is calculated. This method also recursively initializes
   * its parent DStreams.
   */
  private[streaming] def initialize(time: Time) {
    //...

    // Set the minimum value of the rememberDuration if not already set
    var minRememberDuration = slideDuration
    if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
      // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
      minRememberDuration = checkpointDuration * 2
    }
    if (rememberDuration == null || rememberDuration < minRememberDuration) {
      rememberDuration = minRememberDuration
    }

    // Initialize the dependencies
    dependencies.foreach(_.initialize(zeroTime))
  }

结合其他代码,关于minRememberDuration的生成可以得出以下结论:

  • stateStream的checkpointDuration是每个Dstream自己设置的,checkpointDuration是是客户端调用checkpoint(interval: Duration)指定的;
  • 其他DStream使用slideDuration,即batch的interval;
  • minRememberDuration选择了2倍的checkpointDuration或者interval

如果checkpointDuration设置为5,当batch-100执行结束的时候,会删除batch-(100-5*2=90)的数据,当batch-100的metadata的checkpoint结束的时候,会删除rdd-90的checkponit数据;

我认为cache两个checkpointDuration周期的数据,是很大的浪费,完全可以只cache一个checkpointDuration的数据;

checkpoint是否可以保证流的exactly once

当程序重启时,保证数据不丢是个很重要的问题,基于之前的分析,可以分为两种情况讨论:

  • 1.如果流作业是无状态的,即不包含state stream的流作业,在从checkpoint中恢复作业的过程可以看出:未完成的Time(batch)都进行重新生成job,重新运算;以DirectKafkaInputDStream为例,对于已经生成的kafkaRDD,可以通过checkpoint恢复,后续依赖它的DStream的rdd,都通过这些恢复的RDD计算;

  • 2.对于有状态的流作业,即包含state stream的作业,可以恢复stateStream上次进行checkpoint的时候生成的数据,如果当前运行的batch是checkpointTime+3的作业,那么,checkpointTime+3的生成需要依赖,checkpointTime+1,checkpointTime+2的KafkaRDD和checkpointTime RDD,这些数据都可以通过chekpoint进行恢复,所以数据不会丢失;

所以,在spark streaming内部是可以保证exactly once的语义的;但是,对于OutPutDStream而言,可能失败前已经输出了一部分数据,那么无法避免这个批次的数据进行重新计算输出,所以只能实现Atleast once语义!
注意:本次分析使用的代码,基于spark2.4

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容