架构简介
BlockManager 是一个嵌入在 spark 中的 key-value型分布式存储系统,是为 spark 量身打造的,BlockManager 在一个 spark 应用中作为一个本地缓存运行在所有的节点上, 包括所有 driver 和 executor上。 BlockManager 对本地和远程提供一致的 get 和set 数据块接口, BlockManager 本身使用不同的存储方式来存储这些数据, 包括 memory, disk, off-heap。
上面是一个整体的架构图, BlockManagerMaster拥有BlockManagerMasterEndpoint 的actor和所有BlockManagerSlaveEndpoint的ref, 可以通过这些引用和 slave 通信,executor 节点上的BlockManagerMaster 则拥有BlockManagerMasterEndpoint的ref和自身BlockManagerSlaveEndpoint的actor。可以通过 Master的引用注册自己。在master 和 slave 可以正常的通信之后, 就可以根据设计的交互协议进行交互, 整个分布式缓存系统也就运转起来了。下面我们主要分析driver端的blockManager,executor端的类似。
初始化
driver端的blockManager的初始化在创建sparkContext的时候进行。
SparkEnv:
SparkContext:
通信
从1.6开始,spark就已经使用netty替代akka,但是会发现底层实现虽然不一样了,但是接口封装的和之前akka的形式还是类似的。blockMageger的通信模块我分为两块理解,一个是blockTransferService,这个是用来传输block内容的服务,还有一个是master和slave的actor,它们主要是用来传递一些命令。
以上是BlockManagerMasterEndpoint部分的命令类别。包含注册,更新block信息,获取block信息等等的命令。
存储
blockManager的存储分为memory disk
private[spark] val memoryStore =new MemoryStore(conf,blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf,diskBlockManager,securityManager)
上面BlockManager的里面的两个类,通过这两个类来管理自己的block的存储。进去它们的实现部分,内存部分是有hashmap来实现的,而disk则是在本地写文件的形式。这里不展开。
获取block内容流程
getLocalValues(blockId): blockManager首先会去本地获取,从memoryStore 和DiskStore获取, 具体实现在:getLocalValues(blockId: BlockId): Option[BlockResult]
getRemoteValues[T](blockId):本地若是不存在,则要通过远程获取,首先master.getLocationsAndStatus(blockId) 发送命令到driver端的master端,master接受到之后context.reply(getLocationsAndStatus(blockId)),返回block相关的信息,slave接收到后 blockTransferService.fetchBlockSync,通过blockTransferService下载block的内容
更新block信息
和获取block信息类似,现在本地保存,然后reportBlockStatus(blockId, putBlockStatus),