1. Spark存储体系组件关系解释
BlockInfoManger 主要提供读写锁控制,层级仅仅位于BlockManger之下,通常Spark读写操作都先调用BlockManger,然后咨询BlockInfoManger是否存在锁竞争,然后才会调用DiskStore和MemStore,进而调用DiskBlockManger来确定数据与位置映射,或者调用 MemoryManger来确定内存池的软边界和内存使用申请。
1.1 Driver 与 Executor 与 SparkEnv 与 BlockManger 组件关系:
Driver与 Executor 组件各自拥有任务执行的SparkEnv环境,而每一个SparkEnv 中都有一个BlockManger负责存储服务,作为高层抽象,BlockManger 之间需要通过 RPCEnv,ShuffleClient,及BlocakTransferService相互通讯。
1.1 BlockInfoManger 与 BlockInfo 共享锁和排它锁读写控制关系:
val NO_WRITER: Long = -1
val NON_TASK_WRITER: Long = -1024
* The task attempt id of the task which currently holds the write lock for this block, or
* [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
* [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
def writerTask: Long = _writerTask
def writerTask_=(t: Long): Unit = {
_writerTask = t
private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
*Tracks the set of blocks that each task has locked for writing.
private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
with mutable.MultiMap[TaskAttemptId, BlockId]
*Tracks the set of blocks that each task has locked for reading, along with the number of times
*that a block has been locked (since our read locks are re-entrant).
private[this] val readLocksByTask =
new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
1.3 DiskBlockManager 与 DiskStore 组件关系:
以下是DiskStore的抽象写操作,需要传入FileOutputStream => Unit高阶函数:
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val fileOutputStream = new FileOutputStream(file) var threwException: Boolean = true try { writeFunc(fileOutputStream) threwException = false } finally { try { Closeables.close(fileOutputStream, threwException) } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
def getBytes(blockId: BlockId): ChunkedByteBuffer = { val file = diskManager.getFile(blockId.name) val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { * For small files, directly read rather than memory map if (file.length < minMemoryMapBytes) { val buf = ByteBuffer.allocate(file.length.toInt) channel.position(0) while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException("Reached EOF before filling buffer\n" + s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") } } buf.flip() new ChunkedByteBuffer(buf) } else { new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) } } { channel.close() } }
1.3 MemManager 与 MemStore 与 MemoryPool 组件关系:
- 内存池模型 :逻辑上分为堆内存和堆外内存,然后堆内存(或堆外内存)内部又分为StorageMemoryPool和ExecutionMemoryPool。
- MemManager是抽象的,定义了内存管理器的接口规范,方便以后扩展,比如:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
- MemStore 依赖于UnifiedMemoryManager进行内存的申请和软边界变化或内存释放。
- MemStore 内部同时负责存储真实的对象,比如内部成员变量:entries ,建立了内存中的BlockId与MemoryEntry(Block的内存的形式)之间的映射。
- MemStore 内部的“占座”行为,如:内部变量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。
1.4 BlockManagerMaster 与 BlockManager 组件关系:
- BlockManagerMaster的作用就是对存在于Dirver或Executor上的BlockManger进行统一管理,这简直是代理行为,因为他持有BlockManagerMasterEndpointREf,进而和BlockManagerMasterEndpoint进行通讯。
2. Spark存储体系组件BlockTransferServic传输服务
3. 总结
秦凯新 20181031 凌晨