Spark 原理篇:存储系统

Spark 存储系统服务对象

RDD 缓存

好处

1.通过截断 DAG,可以降低失败重试的计算开销
2.通过对缓存内容的访问,可以有效减少从头计算的次数,从整体上提升作业端到端的执行性能

Shuffle 中间文件

shuffle阶段

Map 阶段:Shuffle writer 按照 Reducer 的分区规则将中间数据写入本地磁盘;
Reduce 阶段:Shuffle reader 从各个节点下载数据分片,并根据需要进行聚合计算。
shufflewrite&shuffleread需要依托存储系统完成

广播变量

好处

在 Executors 进程范畴内保存全量数据
同一 Executors 内的所有计算任务,应用就能够以 Process local 的本地性级别,来共享广播变量中携带的全量数据了。

存储系统的基本组件

BlockManagerMaster(运行在Driver)

职责

与各个Executors中BlockManager通信汇总全局数据存储状态

BlockManager(运行在Executors)

职责

对外

Driver

BlockManager 与 Driver 端的 BlockManagerMaster 通信,不仅定期向 BlockManagerMaster 汇报本地数据元信息,还会不定时按需拉取全局数据存储状态。

不同 Executors

不同 Executors 的 BlockManager 之间也会以 Server/Client 模式跨节点推送和拉取数据块

管理过程

Spark 存储系统提供了两种存储抽象:MemoryStore 和 DiskStore。BlockManager 正是利用它们来分别管理数据在内存和磁盘中的存取。见下文解析

BlockManager(管理数据在内存和磁盘中的存取)

MemoryStore

职责

内存存取:管理广播变量,RDD内存缓存

存储形式

对象值(Object Values 反序列化)访问更快,和字节数组(Byte Array序列化)节约空间

透过 RDD 缓存看 MemoryStore

统一采用 MemoryEntry 数据抽象(两个实现类:DeserializedMemoryEntry 和 SerializedMemoryEntry)

DeserializedMemoryEntry : 原始对象值
SerializedMemoryEntry: 序列化之后的字节数组

数据块的存取结构

MemoryStore 能够借助一种高效的数据结构来统一存储与访问数据块:LinkedHashMap[BlockId, MemoryEntry],即 Key 为 BlockId,Value 是 MemoryEntry 的链式哈希字典,MemoryEntry 既可以是 DeserializedMemoryEntry,也可以是 SerializedMemoryEntry,有了这个字典,我们通过 BlockId 即可方便地查找和定位 MemoryEntry,实现数据块的快速存取。

数据块的存取过程

image.png

1.Unroll :通过调用 putIteratorAsValues 或是 putIteratorAsBytes 方法,把 RDD 迭代器展开为数据值,然后把这些数据值暂存到一个叫做 ValuesHolder 的数据结构里
2.转换,为了节省内存开销,我们可以在存储数据值的 ValuesHolder 上直接调用 toArray 或是 toByteBuffer 操作,把 ValuesHolder 转换为 MemoryEntry 数据结构。注意啦,这一步的转换不涉及内存拷贝,也不产生额外的内存开销,因此 Spark 官方把这一步叫做“从 Unroll memory 到 Storage memory 的 Transfer(转移)
3.第三步,这些包含 RDD 数据值的 MemoryEntry 和与之对应的 BlockId,会被一起存入 Key 为 BlockId、Value 是 MemoryEntry 引用的链式哈希字典中。因此,LinkedHashMap[BlockId, MemoryEntry]缓存的是关于数据存储的元数据,MemoryEntry 才是真正保存 RDD 数据实体的存储单元。换句话说,大面积占用内存的不是哈希字典,而是一个又一个的 MemoryEntry。
总的来说,RDD 数据分片、Block 和 MemoryEntry 三者之间是一一对应的,当所有的 RDD 数据分片都物化为 MemoryEntry,并且所有的(Block ID, MemoryEntry)对都记录到 LinkedHashMap 字典之后,RDD 就完成了数据缓存到内存的过程。
*这里,你可能会问:“如果内存空间不足以容纳整个 RDD 怎么办?”很简单,强行把大 RDD 塞进有限的内存空间肯定不是明智之举,所以 Spark 会按照 LRU 策略逐一清除字典中最近、最久未使用的 Block,以及其对应的 MemoryEntry。相比频繁的展开、物化、换页所带来的性能开销,缓存下来的部分数据对于 RDD 高效访问的贡献可以说微乎其微。

DiskStore

职责

磁盘存取:管理shuffle中间文件,RDD磁盘缓存

存储形式

DiskStore 中数据的存取本质上就是字节序列与磁盘文件之间的转换,它通过 putBytes 方法把字节序列存入磁盘文件,再通过 getBytes 方法将文件内容转换为数据块

透过 Shuffle 看 DiskStore

DiskStore 这个狡猾的家伙并没有亲自维护这些元数据,而是请了 DiskBlockManager 这个给力的帮手

DiskBlockManager

记录逻辑数据块 Block 与磁盘文件系统中物理文件的对应关系,每个 Block 都对应一个磁盘文件

DiskStore 与 DiskBlockManager 的交互过程

Spark 默认采用 SortShuffleManager 来管理 Stages 间的数据分发,在 Shuffle write 过程中,有 3 类结果文件:temp_shuffle_XXX、shuffle_XXX.data 和 shuffle_XXX.index。Data 文件存储分区数据,它是由 temp 文件合并而来的,而 index 文件记录 data 文件内不同分区的偏移地址。Shuffle 中间文件具体指的就是 data 文件和 index 文件,temp 文件作为暂存盘文件最终会被删除。在 Shuffle write 的不同阶段,Shuffle manager 通过 BlockManager 调用 DiskStore 的 putBytes 方法将数据块写入文件。文件由 DiskBlockManager 创建,文件名就是 putBytes 方法中的 Block ID,这些文件会以“temp_shuffle”或“shuffle”开头,保存在 spark.local.dir 目录下的子目录里。在 Shuffle read 阶段,Shuffle manager 再次通过 BlockManager 调用 DiskStore 的 getBytes 方法,读取 data 文件和 index 文件,将文件内容转化为数据块,最终这些数据块会通过网络分发到 Reducer 端进行聚合计算。

结合 RDD 数据存储到 MemoryStore 的过程,你能推演出通过 MemoryStore 通过 getValues/getBytes 方法去访问 RDD 缓存内容的过程吗?

1.getBytes/getValues 的实现都比较简单,都是先对LinkedHashMap加锁,通过blockId取出对应的MemoryEntry,然后通过模式匹配,getBytes负责处理序列化的SerializedMemoryEntry,并返回Option[ChunkedByteBuffer],ChunkedByteBuffer是一个只读字节缓冲区,物理上存储为多个块而不是单个连续数组;getValues负责处理对象值序列DeserializedMemoryEntry,返回一个Iterator

参考 RDD 缓存存储的过程,你能推演出广播变量存入 MemoryStore 的流程吗?

2.我先描述下调用链路:TorrentBroadcast#writeBlocks -> BlockManager#putBytes -> BlockManager#save 到这一步会判断存储级别,如果useMemory&&deserialized,会走这条链路:BlockManager#saveDeserializedValuesToMemoryStore -> MemoryStore#putIteratorAsValues -> MemoryStore#putIterator,这一步尝试将给定的块作为值或字节放入内存存储。 但是迭代器可能太大,无法具体化并存储在内存中。为了避免OOM异常,这里会逐渐展开迭代器,同时定期检查是否有足够的可用内存。如果块被成功物化,那么物化过程中使用的临时展开内存就被“转移”到存储内存中;再回到上面存储级别的判断,如果使用内存并且序列化,则走下面的调用链路:BlockManager#saveSerializedValuesToMemoryStore -> MemoryStore#putBytes,这里会测试MemoryStore中是否有足够的空间。如果空间足够,则创建ByteBuffer并将其放入MemoryStore。否则就不会创建ByteBuffer。最终会用SerializedMemoryEntry将 ByteBuffer 封装起来,放到老师在文中提到的LinkedHashMap。可惜极客时间评论没办法发图片,不然调用链路看起来会更直观。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容