Spark checkpoint 过程

介绍

我在学习 Spark checkpoint 时,发现网上的教程 只介绍了 某些使用场景,加上只说明 checkpoint 的作用,印象不深刻。通过源码来学习 一是印象更深刻,二是能够较全面的掌握 checkpoint 的功能 以及原理。

先简单了解一下 checkpoint 的功能:

  1. Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的 RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化。

  2. 可以采用 persists 把数据在内存 或磁盘中,但却不可靠,如果磁盘或内存会损坏,数据就会丢失。

  3. 所以就有了Checkpoint,Checkpoint 的作用是把 RDD 存储到一个高可用的地方(通常这个地方就是HDFS,HDFS会把文件复制多个复本 保存在其他节点上)

一个使用 checkpoint 的例子

先看一个 checkpoint 的使用例子:

    def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("WordCount")
        val sc = new SparkContext(sparkConf)
        //通过 SparkContext 对 checkpoint 设置 hdfs目录,不设置会报错
        sc.setCheckpointDir("hdfs://zyb01:9000/checkpoint")

        val reduceRdd = sc.textFile(args(0)).flatMap(_.split(" "))
          .map((_, 1)).reduceByKey(_ + _)

        //对 reduceRdd 调用 checkpoint 把数据保存到 hdfs 
        reduceRdd.checkpoint()

        //action
        reduceRdd.saveAsTextFile(args(1))
    }

源码分析

下面的源码分析,省略了一些不重要代码,只保留主流程代码

checkpoint 的初始化

1. 设置 checkpoint 目录

首先 设置一个checkpoint 目录,用来保存我们想保存的 RDD,我们看一下代码:

def setCheckpointDir(directory: String) {
    checkpointDir = ... // 利用hadoop的api 创建了一个hdfs目录
}
2. 创建 ReliableRDDCheckpointData

ReliableRDDCheckpointData 主要实现了 保存 RDD 的功能。

在上面 例子 中,调用了 rdd 的 checkpoint 方法,看下此方法的代码:

  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    checkpointData = new ReliableRDDCheckpointData(this)
  }

创建了 ReliableRDDCheckpointData,看下其定义:

private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) {
  ...
}

ReliableRDDCheckpointData 继承自 RDDCheckpointData,主要维护 checkpoint 时的状态,看下 RDDCheckpointData 定义:

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {

  // RDD 的 checkpoint 状态,包括 Initialized(初始化)、
  // CheckpointingInProgress(正在 checkpoint)、Checkpointed(初始化完毕)
  protected var cpState = Initialized
  
  ...
}

RDDCheckpointData 是一个抽象类,维护了 rdd checkpoint 时的状态,初始化的状态是 Initialized,当开始 checkpoint 和 checkpoint 完成时,状态会同步更新。

开始checkpoint,即写入 RDD 到 hdfs

开始 checkpoint 的入口 是在 SparkContext 提交 Job 时(提交 Job 是发生在 RDD 调用 action 算子时),即 在 SparkContext 的 runJob:

  def runJob[T, U: ClassTag](rdd: RDD[T], ...): Unit = {
    ...
    rdd.doCheckpoint()
  }

看下 RDD.scala 中的 doCheckpoint 方法:

  private[spark] def doCheckpoint(): Unit = {
    // 如果 checkpointData 被创建了,则先遍历所有父RDD 进行checkpoint,
    // 然后 对自己 进行 checkpoint。  否则对所有父 RDD 进行 checkpoint
    if (checkpointData.isDefined) {
      // 如果配置了 "spark.checkpoint.checkpointAllMarkedAncestors" 为 true
      // 即遍历rdd 的所有父依赖 都调用 doCheckpoint
      if (checkpointAllMarkedAncestors) {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
      // 调用 ReliableRDDCheckpointData 的 checkpoint 方法
      checkpointData.get.checkpoint()
    } else {
      dependencies.foreach(_.rdd.doCheckpoint())
    }
  }

先 遍历 依赖的 父RDD,进行 checkpoint,然后对自己进行checkpoint,最终都会调用 RDDCheckpointData 的 checkpoint() ,看下代码:

  final def checkpoint(): Unit = {
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        //如果是初始化状态,则视为 正在 checkpoint 状态
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    //抽象方法,是由子类实现。实现保存 RDD 功能,并返回一个 newRDD
    val newRDD = doCheckpoint()

    // 更新 checkpoint 状态,并截断 RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      
      //状态设为 初始化完毕 状态
      cpState = Checkpointed
      
      //(重点关注!)从外部传来的 RDD,并调用其markCheckpointed
      // 主要把 rdd 的 dependencies_、partitions_ 等 置为 null,也就是断开父依赖
      rdd.markCheckpointed()
    }
  }

在此方法中,首先更新了state 状态,然后调用了 doCheckpoint() 方法,此方法在子类 ReliableRDDCheckpointData 中实现,看下其实现代码:

  protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

    newRDD
  }

调用 ReliableCheckpointRDD 的 writeRDDToCheckpointDirectory 方法 创建了一个 RDD,并返回,转到 ReliableCheckpointRDD.scala ,看下 writeRDDToCheckpointDirectory 方法代码:

  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {

    val sc = originalRDD.sparkContext
    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)

    // 把 hadoop 的配置信息设为 广播变量,传给各个分区使用
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
      
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    // 向集群提交一个Job去执行 checkpoint 操作,将 RDD 序列化到 HDFS 目录上
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    //把各个分区的 RDD 保存到 HDFS 目录上
    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    // 根据 原始的RDD,来创建 ReliableCheckpointRDD,最终 RDDCheckpointData 的 cpRDD 持有了此 newRDD。
    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)

    newRDD
  }

主要工作是 把配置信息设为 广播变量,给所有 partitioner 使用;然后又提交了一个 Job,完成把设置了 checkpoint 的 RDD 保存到 hdfs。并根据 checkpoint 的保存路径 又创建了一个
ReliableCheckpointRDD。为什么又再创建一个新的 RDD,这个新的 RDD 主要用来 负责以后 读取在文件系统上的 checkpoint 文件

再梳理下流程:

  1. 首先从 runJob 中调用 RDDCheckpointData 的 checkpoint 方法
  2. 在 checkpoint 中的 doCheckpoint() 由子类实现,完成RDD 保存到 hdfs 的过程;并创建了一个新的 RDD,此RDD 用来读取 hdfs 时用到
  3. 在 checkpoint 中 还清空了 RDD 的依赖链,下次直接从 hdfs 中读取即可,实现了 persist 同样的功能。

Checkpoint 写数据时序图:


读取 checkpoint 到 hdfs 中的 RDD

什么时候读取 已经 checkpoint 的 RDD 呢?是在执行 Task 的时候。
我们可以看一下ShuffleMapTask 里的计算方法runTask :

override def runTask(context: TaskContext): MapStatus = {
  ...
  writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
}

这是spark真正调用计算方法的逻辑 runTask 调用 rdd.iterator() 去计算该 rdd 的 partition 的,我们来看 RDD 的iterator():

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  ...
  compute(split, context)
}

最终调用了 compute 抽象方法,此方法在上面提到的 ReliableCheckpointRDD 中实现:

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    // 根据 保存 RDD 的路径等创建 file 
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    // 从保存 RDD 的路径中读取 此RDD
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

readCheckpointFile 实现了 从 hdfs 中读取 此RDD 的功能,并进行 反序列化不再往下追代码了。

Checkpoint 读取数据时序图:


总结

  1. 在进行 RDD 的 Checkpoint 的时候其所依赖的所有的 RDD 都会从计算链条中清空掉。

  2. Checkpoint 改变了 RDD 的 Lineage。因为 所有依赖的 RDD 都被清掉,所以下次计算是直接从 checkpointed 的 RDD 开始的。

  3. checkpoint 是另外启动一个 Job,并重新计算。而不是复用计算完的结果。因此建议在 checkpoint 之前进行 cache 或 persist 操作

  4. 对 第3点 进行补充说明:一般在进行 checkpoint 方法调用前通常都要进行 persist 来把当前 RDD 的数据持久化到内存或者磁盘上,这是因为 checkpoint 是 Lazy 级别的,必须有 Job 的执行且在 Job执行后才会从后往前回溯哪个 RDD 进行了 checkpoint 标记,然后对标记了要进行 checkpoint 的 RDD 新启动一个 Job 执行具体的 Checkpoint 的过程。

  5. 定义的计算函数 也被序列化到 checkpoint 目录,当应用代码发生改变时,此时就没法从 checkpoint 恢复。这是一个坑,需要留意。

参考

https://yq.aliyun.com/articles/74946?utm_campaign=wenzhang&utm_medium=article&utm_source=QQ-qun&201752&utm_content=m_19140

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容