一.checkpoint的基本使用
Checkpoint 可以还原药水。辅助 Spark 应用从故障中恢复。SparkStreaming 宕机恢复,
适合调度器有自动重试功能的。对于 SparkCore 则适合那些计算链条超级长或者计算耗时的
关键点进行 Checkpoint,便于故障恢复。
Checkpoint 和 persist 从根本上不一样:
- 1,Cache or persist
Cache or persist 保存了 RDD 的血统关系,假如有部分 cache 的数据丢失可以根据血缘关
系重新生成。 - 2,Checkpoint
会将 RDD 数据写到 hdfs 这种安全的文件系统里,并且抛弃了 RDD 血缘关系的记录。即使 persist 存储到了磁盘里面,在 driver 停掉之后会被删除,而 checkpoint 可以被下次启动使
用。 - 3.Checkpoint 基本使用
对于 SparkStreaming 任务,请参考源码例子 RecoverableNetworkWordCount
对 SparkCore:
DoCheckpoint
val sc = new SparkContext(confspark)
sc.setCheckpointDir("/test/checkpoint")
val textrdd=sc.textFile("/agent/test.txt")
textrdd.checkpoint()
val count = textrdd.count()
val file = textr
Recover
val count =
sc.checkpointFile("/test/checkpoint/7ce1511a-c25c-4ba7-9846-ee702e5c470d/rdd-1")
.count()
println(count)
二,Checkpoint 的初始化源码
- 1,设置 Checkpoint 目录
sc.setCheckpointDir("checkpointDirectory") - 2,调用 Checkpoint 方法,构建 checkpointData
mapped.checkpoint()
checkpointData = Some(new ReliableRDDCheckpointData(this))