RDD缓存方式
- RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
- 源码展示
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
缓存之间的区别
- 通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份。persist的存储级别共12中。Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
- 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
- object StorageLevel源码展示:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
面试回答缓存的区别
1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
2)cache只有一个默认的缓存级别MEMORY_ONLY,cache调用了persist,而persist可以根据情况设置其它的缓存级别;
3)executor执行的时候,默认60%做cache(),40%做task操作,persist是最根本的函数,最底层的函数。
Spark中的cache后面能不能接其他算子,它是不是action操作?
答:cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。
cache不是action操作。
Spark为什么要持久化,一般什么场景下要进行persist操作?
答:为什么要进行持久化?
- spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,RDD出错或者分片可以根据血统算出来,如果没有对父RDD进行persist 或者cache的化,就需要重头做。为了数据安全和保证高效率,所以需要对其进行持久化。
- 以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化;
2)计算链条非常长,重新恢复要算很多步骤,最好使用persist;
3)checkpoint所在的RDD要持久化persist,lazy(懒加载)级别,框架发现有checnkpoint,checkpoint时单独触发一个job,需要重算一遍,checkpoint前要持久化,写个RDD.cache或者RDD.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算RDD链条了。checkpoint之前一定会进行persist。
4)shuffle之后为什么要persist?--shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大;
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。
Attention Please--文章来自互联网资料整理,如有雷同,纯属李小李抄袭,如有侵权请联系删除 From 李小李