Spark自建的逻辑内存管理器是怎么申请和释放内存的?

漫谈Spark内存管理(一)中,概述了Spark内存管理做的事情,并着重对unroll memory的概念做了解释及分析。本文继续讨论Spark Memory Manager的功能实现.

Spark的MemoryManager提供了一套逻辑上的内存申请和释放机制。spark1.6之后,UnifiedMemoryManager成为默认内存管理器,所以笔者以UnifiedMemoryManger为例分析spark内存管理器的具体实现。

1 存储内存管理

1.1 申请存储内存

Spark中的RDD Block,Broadcast Block都可能使用存储内存(也可能用磁盘)进行存储,存储之前必须先向MemoryManager申请所需要的内存空间。

UnifiedMemoryManger.acquireStorageMemory方法用于为block申请指定memory mode(onHeap或offHeap),指定memory size的存储内存空间:

UnifiedMemoryManger.acquireStorageMemory

1. 首先,根据申请的memorymode获取对应的执行内存池,存储内存池和最大存储内存(maxMemory);

2. 最大存储内存由UnifiedMemoryManager的两个方法提供:


In UnifiedMemoryManager

UnifiedMemoryManager的执行和存储内存是可以互借的,所以这里获取最大存储内存时,直接用最大内存减去已用执行内存。

3. 看看maxHeapMemory和maxOffHeapMemory是怎么得到的:

    a. maxHeapMemory由UnifiedMemoryManager.getMaxMemory方法计算得到:


UnifiedMemoryManager.getMaxMemory

    首先,获取系统内存大小,默认直接调用 java 的Runtime.getRuntime.maxMemory 方法获取当前 jvm 最大内存,这个最大内存的值可通过 jvm 参数-Xmx 配置,但是这个值并不等于-Xmx 指定的值,会稍小一些,不同 jvm 和操作系统可能不同。-Xmx 的值可由--driver-memory和--executor-memory 控制;

    然后, 预留一部分系统内存,默认的RESERVED_SYSTEM_MEMORY_BYTES 为300MB, 也就是说默认预留 450MB 系统内存。可用内存就是系统内存减去预留内存。

    最后,UnifiedMemoryManager 的 maxHeapMemory 就是可用内存乘以spark.memory.fraction.

    b. maxOffHeapMemory直接从配置中读取:

maxOffHeapMemory in MemoryManager

4. 当申请的内存比存储内存池的空闲内存大,则向执行内存池借内存,并调整执行内存池和存储内存池的_poolSize.

5. 最后调用存储内存池的 acquireMemory 方法申请内存。

6. 再看看 storagePool.acquireMemory 的实现:

这里的参数numBytesToFree就是要申请的内存大小减去存储内存池的空闲内存大小:

numBytesToFree
MemoryStore.evictBlocksToFreeSpace

如果存储内存池的当前空闲内存不够,则调用MemoryStore.evictBlocksToFreeSpace方法释放内存。该方法会按照LRU的顺序遍历memoryStore中存储的所有blocks,选择出可驱逐的blocks,可驱逐block需要满足条件:

    a. 使用的memorymode与要申请的memorymode相同

    b. 与申请内存的block不属于同一个rdd

    c. 没有正在被读取(not locked for reading)

    注意,因为MemoryStore.entries的类型为java.util.LinkedHashMap,并且accessOrder为true:

MemoryStore.entries

    所以,MemoryStore.evictBlocksToFreeSpace在遍历entries选择可驱逐块时,是按照LRU(Lease Recently Used)算法进行的。

    只有当可释放的内存总量大于需要释放的内存量时才会调用blockEvictionHandler.dropFromMemory从内存中删除选中的blocks. 需注意,因为evictBlocksToFreeSpace方法会调用memoryManager.synchronized,所以,同一时刻最多只有一个task在删除memoryStore中的block.

释放完内存后,如果空闲内存足够,则更新存储内存池的_memoryUsed变量。

从上面的分析可以看出,acquireStorageMemory会判断当前空闲存储内存是否足够,如果不够则会从执行内存池借空闲内容,如果还不够,则会按照LRU的顺序驱逐当前内存池中某些满足条件的blocks以释放内存。如果这两种措施都无法获取足够的空闲存储内存,则申请存储内存失败,acquireStorageMemory返回false. 如果申请成功,则更新存储内存池的_memoryUsed变量,表示这部分内存已被使用。

1.2 释放存储内存

UnifiedMemoryManger.releaseStorageMemory方法用于释放存储内存:

UnifiedMemoryManger.releaseStorageMemory

同样也是根据memory mode调用不同存储内存池的releaseMemory方法。onHeapStoreageMemoryPool和offHeapStorageMemoryPool都是StorageMemoryPool类的对象,下面看看StorageMemoryPool.releaseMemory方法的实现:

StorageMemoryPool.releaseMemory

很简单,就是更新存储内存池的_memoryUsed变量。

2 执行内存管理

2.1 申请执行内存

Spark中的shuffle,aggregate,join等操作都会使用执行内存,每个task在执行时会通过taskMemoryManager调用MemoryManager的acquireExecutionMemory方法申请需要的执行内存。UnifiedMemoryManger.acquireExecutionMemory方法做了以下步骤:

    1. 根据memory mode获取对应的执行内存池,存储内存池, 用于存储的内存大小, 最大内存:

UnifiedMemoryManger. acquireExecutionMemory

      其中maxHeapMemory,maxOffHeapMemory和上文介绍的是一样的,这点也体现了UnifiedMemoryManager的unified,哈哈。storageRegionSize是用于数据存储的内存大小,对于onHeap内存,storageRegionSize为:

onHeapStorageRegionSize

其中的maxMemory就是上文中介绍的UnifiedMemoryManager.getMaxMemory方法返回的值。对于offHeap内存, storageRegionSize为:


offHeapStorageRegionSize

这里的maxOffHeapMemory和上文提到的也是同一个。

    2. 定义用于增长执行内存池的maybeGrowExecutionPool方法,以及用于计算最大执行内存池大小的computeMaxExecutionPoolSize方法:

UnifiedMemoryManager.maybeGrowExecutionPool

    maybeGrowExecutionPool方法会做:

        a. 计算存储内存池的实际poolsize和用于数据存储的内存大小storageRegionSize之间的差值。因为在存储block(比如RDD block,broadcastblock)时,存储内存池可能从执行内存池借内存,所以它的实际poolsize可能大于配置的storageRegionSize.我们暂且称这个差值为delta.

        b. 比较存储内存池的空闲内存和delta,取更大的作为memoryReclaimableFromStorage.也就是说这里不仅会收回存储内存池从执行内存池借的内存,还会向存储内存池借空闲内存,即“回收+借”。

        c. 调用StorageMemoryPool.freeSpaceToShrinkPool方法,该方法会先从存储内存池的空闲内存中获取需要reclaim的内存,如果不够则会调用MemoryStore.evictBlocksToFreeSpace方法驱逐存储在内存中的某些block以释放内存。

        d. 最后,调整存储内存池和执行内存池的大小。

    computeMaxExecutionPoolSize方法用于计算调用maybeGrowPool之后,执行内存池的最大size,实现比较简单:

UnifiedMemoryManager.computeMaxExecutionPoolSize

    注意,如果storagePool.memoryUsed大于storageRegionSize,则说明在maybeGrowPool中调用freeSpaceToShrinkPool方法时未能成功释放delta大小的内存(或者是不需要释放那么多)。此时,computeMaxExecutionPoolSize方法返回的值会大于执行内存池的实际poolsize。

    3. 调用ExecutionMemoryPool.acquireMemory方法申请执行内存。

再来看看ExecutionMemoryPool.acquireMemory的实现:

ExecutionMemoryPool.acquireMemory

ExecutionMemoryPool.acquireMemory用了一个while循环不断尝试分配内存,只有分配成功的情况下才会退出循环。每次尝试会做:

1. 尝试从存储内存池回收内存,从而增长执行内存池的大小;

2. 获取执行内存池的最大容量maxPoolSize,累计分配给单个task的内存大小范围为[maxPoolSize/2*numActiveTasks,maxPoolSize/(numActiveTasks)];

3. 根据当前task已占用内存,申请的内存大小,可分配的内存大小范围,以及执行内存池的空闲内存大小,最终确定要分配的内存大小toGrant;

4. 如果分配toGrant内存之后task所占内存仍小于maxPoolSize/2*numActiveTasks,则调用lock.wait()等待其他task释放内存;

5. 如果toGrant在正常范围内,则更新指定task的已占用内存,并结束循环。

2.2 释放执行内存

ExecutionMemoryPool.releaseMemory

可以看到,释放执行内存最终是通过更新memoryForTask这个map来实现的。最后通过执行内存池的lock通知所有在请求执行内存时由于内存不足调用lock.wait()等待的任务线程。执行内存池的已用内存大小是从memoryForTask计算得到的:


ExecutionMemoryPool.memoryUsed

3 分析&总结

3.1 存储内存和执行内存申请过程的不同

基于上文中的源码分析,比较申请存储内存和执行内存的过程会发现,在申请执行内存时,spark可能会驱逐存储内存中的block以满足执行内存的需要;而申请存储内存时,只会从执行内存池借空闲内存(而且借的有可能包括执行内存向存储内存借的,所以也应该是 “回收+借”),并不会释放执行内存以满足存储内存的需要。也就是说,在Spark中,执行内存的优先级是更高的。笔者认为,这是因为执行内存用于shuffle,aggregation,join等操作中的各种map等数据结构,强行释放这些内存可能会导致task运行错误或失败,而存储内存主要用于存放缓存的RDD Block,Broadcast Block等数据,spark可以将这些block从内存移到磁盘存储或直接删除,在需要访问时可以根据lineage重新计算。

3.2 Spark内存管理器的功能

通过上文对spark memory manager各个方法的源码分析,可以看到spark的内存管理器自建了一套控制内存使用的方案,但这是一套逻辑上的内存管理方案。从实现角度上讲,就是维护了一系列的变量来记录和控制spark各个模块对内存(包括onHeap和offHeap内存)的使用。而真正向操作系统申请和释放物理内存的工作由JVM或Tungsten完成,Tungsten内存管理的核心内容是在TaskMemoryManager类中实现的,后续文章我们会详细讨论。

3.3 总结

本文以UnifiedMemoryManager为例,从源码角度分析了spark内存管理器如何将内存划分为存储和执行内存,详述了存储和执行内存的申请和释放过程。分析了存储和执行内存申请过程中的不同之处,总结了spark内存管理器的功能。

4 说明

    a. 本文spark源码版本为spark 2.4.0

    b. 水平有限,如有错误,望读者指出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容