Spark Storage ④ - 存储执行类介绍(DiskBlockManager、DiskStore、MemoryStore)

本文为 Spark 2.0 源码分析笔记,某些实现可能与其他版本有所出入

这篇文章前半部分我们对直接在 Block 存取发挥重要作用的类进行介绍,主要是 DiskBlockManager、MemoryStore、DiskStore。后半部分以存取 Broadcast 来进一步加深对 Block 存取的理解。

DiskBlockManager

DiskBlockManager 主要用来创建并持有逻辑 blocks 与磁盘上的 blocks之间的映射,一个逻辑 block 通过 BlockId 映射到一个磁盘上的文件。

主要成员

  • localDirs: Array[File]:创建根据 spark.local.dir (备注①)指定的目录列表,这些目录下会创建子目录,这些子目录用来存放 Application 运行过程中产生的存放在磁盘上的中间数据,比如 cached RDD partition 对应的 block、Shuffle Write 产生的数据等,会根据文件名将 block 文件 hash 到不同的目录下
  • subDirs: Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)):localDirs 代表的各个目录下的子目录,子目录个数由 spark.diskStore.subDirectories 指定,子目录用来存储具体的 block 对应的文件,会根据 block file 文件名先 hash 确定放在哪个 localDir,在 hash 决定放在该 localDir 的哪个子目录下(寻找该 block 文件也是通过这种方式)
  • shutdownHook = addShutdownHook():即关闭钩子,在进程结束时会递归删除 localDirs 下所有属于该 Application 的文件

主要方法

看了上面几个主要成员的介绍相信已经对逻辑 block 如何与磁盘文件映射已经有了大致了解。接下来看看几个主要的方法:

  • getFile(filename: String): File:通过文件名来查找 block 文件并获取文件句柄,先通过文件名 hash 到指定目录再查找
  • getFile(blockId: BlockId): File:通过 blockId 来查找 block 文件并获取文件句柄,事实上是通过调用 getFile(filename: String): File 来查找的
  • containsBlock(blockId: BlockId): Boolean:是否包含某个 blockId 对应的文件
  • getAllFiles(): Seq[File]:获取存储在磁盘上所有 block 文件的句柄,以列表的形式返回
  • getAllBlocks(): Seq[BlockId]:获取存储在磁盘上的所有 blockId
  • stop(): Unit:清理存储在磁盘上所有的 block 文件
  • createTempLocalBlock(): (TempLocalBlockId, File):产生一个唯一的 Block Id 和文件句柄用于存储本地中间结果
  • createTempShuffleBlock(): (TempShuffleBlockId, File):产生一个唯一的 Block Id 和文件句柄用于存储 shuffle 中间结果

如上述,DiskBlockManager 提供的方法主要是为了提供映射的方法,而并不会将现成的映射关系保存在某个成员中,这是需要明了的一点。DiskBlockManager 方法主要在需要创建或获取某个 block 对应的磁盘文件以及在 BlockManager 退出时要清理磁盘文件时被调用。


DiskStore

DiskStore 用来将 block 数据存储至磁盘,是直接的磁盘文件操作者。其封装了:

两个写方法

  • put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit:用文件输出流的方式写 block 数据至磁盘
  • putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit:以字节 buffer 的方式写 block 数据至磁盘

一个读方法

  • getBytes(blockId: BlockId): ChunkedByteBuffer:通过 block id 读取存储在磁盘上的 block 数据,以字节 buffer 的形式返回

两个查方法

  • getSize(blockId: BlockId): Long:通过 block id 获取存储在磁盘上的 block 数据的大小
  • contains(blockId: BlockId): Boolean:查询磁盘上是否包含某个 block id 的数据

一个删方法

  • remove(blockId: BlockId): Boolean:删除磁盘上某个 block id 的数据

需要说明的是,DiskStore 的各个方法中,通过 block id 或文件名来找到对应的 block 文件句柄是通过调用 DiskBlockManager 的方法来达成的


MemoryStore

MemoryStore 用来将没有序列化的 Java 对象数组和序列化的字节 buffer 存储至内存中。它的实现比 DiskStore 稍复杂,我们先来看看主要成员

先说明 MemoryEntry

private sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

代表 JVM 或对外内存的内存大小

主要成员

  • entries: LinkedHashMap[BlockId, MemoryEntry[_]]:保存每个 block id 及其存储在内存中的数据的大小及是保存在 JVM 内存中还是堆外内存中
  • unrollMemoryMap: mutable.HashMap[Long, Long]:保存每个 task 占用的用来存储 block 而占用的 JVM 内存
  • offHeapUnrollMemoryMap: mutable.HashMap[Long, Long]:保存每个 task 占用的用来存储 block 而占用的对外内存

以上几个成员主要描述了每个 block 占用了多少内存空间,每个 task 占用了多少内存空间以及它们占用的是 JVM 内存还是堆外内存。接下来看看几个重要的方法:

三个写方法

  • putBytes[T: ClassTag](blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean:先检查是否还有空余内存来存储参数 size 这么大的 block,若有则将 block 以字节 buffer 形式存入;否则不存入,返回失败
  • putIteratorAsValues[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]:尝试将参数 blockId 对应的数据通过迭代器的方式写入内存。为避免由于空余内存不足以存放 block 数据而导致的 OOM。该方法会逐步展开迭代器来检查是否还有空余内存。如果迭代器顺利展开了,那么用来展开迭代器的内存直接转换为存储内存,而不用再去分配内存来存储该 block 数据。如果未能完全开展迭代器,则返回一个包含 block 数据的迭代器,其对应的数据是由多个局部块组合而成的 block 数据
  • putIteratorAsBytes[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]:尝试将参数 blockId 对应的数据通过字节 buffer 的方式写入内存。为避免由于空余内存不足以存放 block 数据而导致的 OOM。该方法会逐步展开迭代器来检查是否还有空余内存。如果迭代器顺利展开了,那么用来展开迭代器的内存直接转换为存储内存,而不用再去分配内存来存储该 block 数据。如果未能完全开展迭代器,则返回一个包含 block 数据的迭代器,其对应的数据是由多个局部块组合而成的 block 数据

两个读方法

  • getBytes(blockId: BlockId): Option[ChunkedByteBuffer]:以字节 buffer 的形式获取参数 blockId 指定的 block 数据
  • getValues(blockId: BlockId): Option[Iterator[_]]:以迭代器的形式获取参数 blockId 指定的 block 数据

若干个查方法

  • getSize(blockId: BlockId): Long:获取 blockId 对应 block 占用的内存大小
  • contains(blockId: BlockId): Boolean:内存中是否包含某个 blockId 对应的 block 数据
  • currentUnrollMemory(): Long:当前所有 tasks 用于存储 blocks 占用的总内存
  • ...

两个删方法

  • remove(blockId: BlockId): Boolean:删除内存中 blockId 指定的 block 数据
  • clear(): Unit:清除 MemoryStore 中存储的所有 blocks 数据

从上面描述的 MemoryStore 的主要方法来看,其功能和 DiskStore 类似,但由于要考虑到 JVM 内存和堆外内存以及有可能内存不足以存储 block 数据等问题会变得更加复杂


备注说明

  • 备注①:设置 spark.local.dir 时可以设置多个目录,目录分别在不同磁盘上,可以增加整体 IO 带宽;也尽量让目录位于更快的磁盘上以获得更快的 IO 速度
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容