介绍
我在学习 Spark checkpoint 时,发现网上的教程 只介绍了 某些使用场景,加上只说明 checkpoint 的作用,印象不深刻。通过源码来学习 一是印象更深刻,二是能够较全面的掌握 checkpoint 的功能 以及原理。
先简单了解一下 checkpoint 的功能:
Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的 RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化。
可以采用 persists 把数据在内存 或磁盘中,但却不可靠,如果磁盘或内存会损坏,数据就会丢失。
所以就有了Checkpoint,Checkpoint 的作用是把 RDD 存储到一个高可用的地方(通常这个地方就是HDFS,HDFS会把文件复制多个复本 保存在其他节点上)
一个使用 checkpoint 的例子
先看一个 checkpoint 的使用例子:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(sparkConf)
//通过 SparkContext 对 checkpoint 设置 hdfs目录,不设置会报错
sc.setCheckpointDir("hdfs://zyb01:9000/checkpoint")
val reduceRdd = sc.textFile(args(0)).flatMap(_.split(" "))
.map((_, 1)).reduceByKey(_ + _)
//对 reduceRdd 调用 checkpoint 把数据保存到 hdfs
reduceRdd.checkpoint()
//action
reduceRdd.saveAsTextFile(args(1))
}
源码分析
下面的源码分析,省略了一些不重要代码,只保留主流程代码
checkpoint 的初始化
1. 设置 checkpoint 目录
首先 设置一个checkpoint 目录,用来保存我们想保存的 RDD,我们看一下代码:
def setCheckpointDir(directory: String) {
checkpointDir = ... // 利用hadoop的api 创建了一个hdfs目录
}
2. 创建 ReliableRDDCheckpointData
ReliableRDDCheckpointData 主要实现了 保存 RDD 的功能。
在上面 例子 中,调用了 rdd 的 checkpoint 方法,看下此方法的代码:
def checkpoint(): Unit = RDDCheckpointData.synchronized {
checkpointData = new ReliableRDDCheckpointData(this)
}
创建了 ReliableRDDCheckpointData,看下其定义:
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) {
...
}
ReliableRDDCheckpointData 继承自 RDDCheckpointData,主要维护 checkpoint 时的状态,看下 RDDCheckpointData 定义:
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
// RDD 的 checkpoint 状态,包括 Initialized(初始化)、
// CheckpointingInProgress(正在 checkpoint)、Checkpointed(初始化完毕)
protected var cpState = Initialized
...
}
RDDCheckpointData 是一个抽象类,维护了 rdd checkpoint 时的状态,初始化的状态是 Initialized,当开始 checkpoint 和 checkpoint 完成时,状态会同步更新。
开始checkpoint,即写入 RDD 到 hdfs
开始 checkpoint 的入口 是在 SparkContext 提交 Job 时(提交 Job 是发生在 RDD 调用 action 算子时),即 在 SparkContext 的 runJob:
def runJob[T, U: ClassTag](rdd: RDD[T], ...): Unit = {
...
rdd.doCheckpoint()
}
看下 RDD.scala 中的 doCheckpoint 方法:
private[spark] def doCheckpoint(): Unit = {
// 如果 checkpointData 被创建了,则先遍历所有父RDD 进行checkpoint,
// 然后 对自己 进行 checkpoint。 否则对所有父 RDD 进行 checkpoint
if (checkpointData.isDefined) {
// 如果配置了 "spark.checkpoint.checkpointAllMarkedAncestors" 为 true
// 即遍历rdd 的所有父依赖 都调用 doCheckpoint
if (checkpointAllMarkedAncestors) {
dependencies.foreach(_.rdd.doCheckpoint())
}
// 调用 ReliableRDDCheckpointData 的 checkpoint 方法
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
先 遍历 依赖的 父RDD,进行 checkpoint,然后对自己进行checkpoint,最终都会调用 RDDCheckpointData 的 checkpoint() ,看下代码:
final def checkpoint(): Unit = {
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
//如果是初始化状态,则视为 正在 checkpoint 状态
cpState = CheckpointingInProgress
} else {
return
}
}
//抽象方法,是由子类实现。实现保存 RDD 功能,并返回一个 newRDD
val newRDD = doCheckpoint()
// 更新 checkpoint 状态,并截断 RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
//状态设为 初始化完毕 状态
cpState = Checkpointed
//(重点关注!)从外部传来的 RDD,并调用其markCheckpointed
// 主要把 rdd 的 dependencies_、partitions_ 等 置为 null,也就是断开父依赖
rdd.markCheckpointed()
}
}
在此方法中,首先更新了state 状态,然后调用了 doCheckpoint() 方法,此方法在子类 ReliableRDDCheckpointData 中实现,看下其实现代码:
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
newRDD
}
调用 ReliableCheckpointRDD 的 writeRDDToCheckpointDirectory 方法 创建了一个 RDD,并返回,转到 ReliableCheckpointRDD.scala ,看下 writeRDDToCheckpointDirectory 方法代码:
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
// 把 hadoop 的配置信息设为 广播变量,传给各个分区使用
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
// 向集群提交一个Job去执行 checkpoint 操作,将 RDD 序列化到 HDFS 目录上
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
//把各个分区的 RDD 保存到 HDFS 目录上
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
// 根据 原始的RDD,来创建 ReliableCheckpointRDD,最终 RDDCheckpointData 的 cpRDD 持有了此 newRDD。
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
newRDD
}
主要工作是 把配置信息设为 广播变量,给所有 partitioner 使用;然后又提交了一个 Job,完成把设置了 checkpoint 的 RDD 保存到 hdfs。并根据 checkpoint 的保存路径 又创建了一个
ReliableCheckpointRDD。为什么又再创建一个新的 RDD,这个新的 RDD 主要用来 负责以后 读取在文件系统上的 checkpoint 文件
再梳理下流程:
- 首先从 runJob 中调用 RDDCheckpointData 的 checkpoint 方法
- 在 checkpoint 中的 doCheckpoint() 由子类实现,完成RDD 保存到 hdfs 的过程;并创建了一个新的 RDD,此RDD 用来读取 hdfs 时用到
- 在 checkpoint 中 还清空了 RDD 的依赖链,下次直接从 hdfs 中读取即可,实现了 persist 同样的功能。
Checkpoint 写数据时序图:
读取 checkpoint 到 hdfs 中的 RDD
什么时候读取 已经 checkpoint 的 RDD 呢?是在执行 Task 的时候。
我们可以看一下ShuffleMapTask 里的计算方法runTask :
override def runTask(context: TaskContext): MapStatus = {
...
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
}
这是spark真正调用计算方法的逻辑 runTask 调用 rdd.iterator() 去计算该 rdd 的 partition 的,我们来看 RDD 的iterator():
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
...
compute(split, context)
}
最终调用了 compute 抽象方法,此方法在上面提到的 ReliableCheckpointRDD 中实现:
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
// 根据 保存 RDD 的路径等创建 file
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
// 从保存 RDD 的路径中读取 此RDD
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
readCheckpointFile 实现了 从 hdfs 中读取 此RDD 的功能,并进行 反序列化不再往下追代码了。
Checkpoint 读取数据时序图:
总结
在进行 RDD 的 Checkpoint 的时候其所依赖的所有的 RDD 都会从计算链条中清空掉。
Checkpoint 改变了 RDD 的 Lineage。因为 所有依赖的 RDD 都被清掉,所以下次计算是直接从 checkpointed 的 RDD 开始的。
checkpoint 是另外启动一个 Job,并重新计算。而不是复用计算完的结果。因此建议在 checkpoint 之前进行 cache 或 persist 操作。
对 第3点 进行补充说明:一般在进行 checkpoint 方法调用前通常都要进行 persist 来把当前 RDD 的数据持久化到内存或者磁盘上,这是因为 checkpoint 是 Lazy 级别的,必须有 Job 的执行且在 Job执行后才会从后往前回溯哪个 RDD 进行了 checkpoint 标记,然后对标记了要进行 checkpoint 的 RDD 新启动一个 Job 执行具体的 Checkpoint 的过程。
定义的计算函数 也被序列化到 checkpoint 目录,当应用代码发生改变时,此时就没法从 checkpoint 恢复。这是一个坑,需要留意。