Attention Please--文章来自互联网资料整理,如有雷同,纯属李小李抄袭,如有侵权请联系删除 From 李小李
- Spark RDD的容错机制可以从lineage和checkpoint两个方面进行分析
Spark RDD lineage容错
lineage概念介绍
- RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
- 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)行为。
- 当这个RDD的部分分区数据丢失时,它能够通过Lineage获取足够的信息来又一次运算和恢复丢失的数据分区。由于这样的粗颗粒的数据模型,限制了Spark的运用场合,所以Spark并不适用于全部高性能要求的场景,但同一时候相比细颗粒度的数据模型,也带来了性能的提升。
RDD依赖特性对lineage的影响(依赖关系的特性)
- RDD中依赖分为窄依赖和宽依赖
- 第一,窄依赖能够在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD相应的某块数据;宽依赖则要等到父RDD全部数据都计算完毕之后,并且父RDD的计算结果进行hash并传到相应节点上之后才干计算子RDD。
- 第二,数据丢失时,对于窄依赖仅仅须要又一次计算丢失的那一块数据来恢复。对于宽依赖则要将祖先RDD中的全部数据块全部又一次计算来恢复。所以在长“血统”链特别是有宽依赖的时候,须要在适当的时机设置数据检查点。
- 也是这两个特性要求对于不同依赖关系要採取不同的任务调度机制和容错恢复机制。
lineage容错原理
- 在容错机制中,假设一个节点死机了。并且运算窄依赖。则仅仅要把丢失的父RDD分区重算就可以,不依赖于其它节点。而宽依赖须要父RDD的全部分区都存在,重算就非常昂贵了。
- 能够这样理解开销的经济与否:在窄依赖中。在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的全部数据都是子RDD分区的数据,并不存在冗余计算。
在宽依赖情况下,丢失一个子RDD分区重算的每一个父RDD的每一个分区的全部数据并非都给丢失的子RDD分区用的,会有一部分数据相当于相应的是未丢失的子RDD分区中须要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此假设使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。
Spark RDD checkpoint容错
checkpoint作用介绍
- checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)
RDD checkpoint 写入流程
checkpoint正确使用
val data = sc.textFile("/tmp/spark/1.data").cache() // 注意要cache
sc.setCheckpointDir("/tmp/spark/checkpoint")
data.checkpoint
data.count
RDD checkpoint 过程中会经过以下几个状态,
[ Initialized –> marked for checkpointing –> checkpointing in progress –> checkpointed ]
- 转换流程如下:
- data.checkpoint 这个函数调用中, 设置的目录中, 所有依赖的 RDD 都会被删除, 函数必须在 job 运行之前调用执行, 强烈建议 RDD 缓存 在内存中(又提到一次,千万要注意哟), 否则保存到文件的时候需要从头计算。初始化RDD的 checkpointData 变量为 ReliableRDDCheckpointData。 这时候标记为 Initialized 状态,
- 在所有 job action 的时候, runJob 方法中都会调用 rdd.doCheckpoint , 这个会向前递归调用所有的依赖的RDD, 看看需不需要 checkpoint 。 需要需要 checkpoint, 然后调用 checkpointData.get.checkpoint(), 里面标记 状态为 CheckpointingInProgress, 里面调用具体实现类的 ReliableRDDCheckpointData 的 doCheckpoint 方法,
- doCheckpoint -> writeRDDToCheckpointDirectory, 注意这里会把 job 再运行一次, 如果已经cache 了,就可以直接使用缓存中的 RDD 了, 就不需要重头计算一遍了(怎么又说了一遍), 这时候直接把RDD, 输出到 hdfs, 每个分区一个文件, 会先写到一个临时文件, 如果全部输出完,进行 rename , 如果输出失败,就回滚delete。
- 标记 状态为 Checkpointed, markCheckpointed 方法中清除所有的依赖, 怎么清除依赖的呢, 就是 吧RDD 变量的强引用 设置为 null, 垃圾回收了, 这个后面我们也知道,会触发 ContextCleaner 里面监听清除实际 BlockManager 缓存中的数据(会单独写一篇分析)
- 推荐博客 https://www.cnblogs.com/superhedantou/p/9004820.html 1,spark core中checkpoint分析
- https://blog.csdn.net/wt346326775/article/details/72870518?utm_source=blogxgwz6 Spark中CheckPoint操作
RDD checkpoint读取流程
- 在做完checkpoint后,获取原来RDD的依赖以及partitions数据都将从CheckpointRDD中获取。也就是说获取原来rdd中每个partition数据以及partitioner等对象,都将转移到CheckPointRDD中。
- 在CheckPointRDD的一个具体实现ReliableRDDCheckpintRDD中的compute方法中可以看到,将会从hdfs的checkpoint目录中恢复之前写入的partition数据。而partitioner对象(如果有)也会从之前写入hdfs的paritioner对象恢复。
- 总的来说,checkpoint读取过程是比较简单的。
RDD checkpoint写入流程的疑问
Q1:RDD中的数据是什么时候写入的?是在rdd调用checkpoint方法时候吗?
Q2:在做checkpoint的时候,具体写入了哪些数据到HDFS了?
Q3:在对RDD做完checkpoint以后,对做RDD的本省又做了哪些收尾工作?
Q4:实际过程中,使用RDD做checkpoint的时候需要注意什么问题?
弄清楚了以上四个问题,我想对checkpoint的写过程也就基本清楚了。接下来将一一回答上面提出的问题。
A1:首先看一下RDD中checkpoint方法,可以看到在该方法中是只是新建了一个ReliableRDDCheckpintData的对象,并没有做实际的写入工作。实际触发写入的时机是在runJob生成改RDD后,调用RDD的doCheckpoint方法来做的。
A2:在经历调用RDD.doCheckpoint → RDDCheckpintData.checkpoint → ReliableRDDCheckpintData.doCheckpoint → ReliableRDDCheckpintData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中可以看到:将作为一个单独的任务(RunJob)将RDD中每个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外如果该RDD中的partitioner如果不为空,则也会将该对象序列化后存储到checkpoint目录。所以,在做checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。
A3:在写完checkpoint数据到hdfs以后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操作。
A4:通过A1,A2可以知道,在RDD计算完毕后,会再次通过RunJob将每个partition数据保存到HDFS。这样RDD将会计算两次,所以为了避免此类情况,最好将RDD进行cache。即1.1中rdd的推荐使用方法如下:
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.cache()
rdd.checkpoint()
RDD checkpoint 与cache和persist的区别
- persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。
- persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。