问题:
1.RDD中基本所有的数据都是存储都在堆内存里,这部分数据是通过jvm中的GC管理的,进行Spark操作的时候可能会出现资源不一致的问题,当我们长时间不使用某个变量,变量会被gc回收,但是Spark显示的时候显示不出来这一部分数据,所以说Spark无法精确的显示内存信息。
2.发生RDD数据丢失的情况的时候,可以根据lineage重新计算RDD的数据,但是如果依赖关系是宽依赖,重算的消耗比较大,可以使用checkpoint将数据提前写入磁盘,读写磁盘的过程消耗宝贵的IO资源并且写入磁盘之后,依赖关系会消失。
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。
持久化缓存使用的是堆外内存,直接由操作系统管理,不受GC的控制,并且堆外内存空间要比堆内存空间大3倍。使用起来也比较简单。当数据丢失之后Spark首先会去缓存中查找数据,没有缓存就检查checkpoint,如果也没有就只能重新计算了。
val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_).cache()
需要注意的是cache是transformation函数,所以需要一个action算子来触发。一般来说缓存shuffle之后的数据,以为shuffle的代价比较大,通常shuffle之后紧接一个cache。
持久化等级:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
参数(按顺序)
useDisk是否使用磁盘
useMemory是否使用内存
useoffHeap 是否使用堆外内存
deserialized是否使用反序列化
replication 副本的数量
如果使用cache函数缓存,默认使用MEMORY_ONLY,如果需要更改持久化等级,可以使用prisist函数设置持久化等级。
缓存的RDD什么时候去释放呢?当我们调用persist缓存一个RDD时,会调用registerRDDForCleanup(this),这就是将本身的RDD注册到一个弱引用中。当这个RDD变为不可达时,会自动将该RDD对象插入到referenceQueue中,等到下次GC时就会走doCleanupRDD分支。为了让出内存,除了手动unpersist之外,MetadataCleaner的SPARK_CONTEXT会定期清理persistentRdds中过期的数据,其实与unpersist产生的作用是一样的。一旦清理了,那这个缓存的RDD就没有强引用了。