9.2 spark内存管理之 UnifiedMemoryManager

图片来源 https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html

1. 内存的分配

如果我们选择了这种UnifiedMemoryManager来管理一个1G内存的Executor, 那么实际上每个部分的内存是多少呢? 下面这个图是1.5的, 1.6对比例进行了微调


已经看过的内存管理图
  1. 系统会预留300MB内存
  2. 在剩余的700MB内存中, 有75%会被分配到storage区域, 用于存储和计算. 这一部分是525MB, 这个比例可以使用spark.memory.fraction 来调整. 另外一种static memory manager的参数和这边不同, 要注意内存管理模型和参数是配套的
  3. 剩下的部分就是Other区域, 用于进行spark自身, 以及broadcast的一些数据的使用

2. 存储和计算内存的动态分配

内存

这张图概括了统一内存管理模型的基本约束

  • Storage部分抢占到Executor的空间随时可能被释放
  • Executor抢占到的Storage部分不会被主动释放, 只能等待使用结束
  • 当有剩余空间时, 大家相安无事, 都没有空间了, 只好把数据向磁盘写入, 而不是去占用预留空间. 预留空间主要为了应对spark自身内存不足的场景

3. 内部对象和方法

申请执行用内存

 /**
    为当前的Task申请numBytes数量的内存, 返回0代表分配失败
   这个请求是可能阻塞的, 直到找到足够的内存页
   每个人物最多可以从内存池中分配到 1/2N , N是当前活跃的任务数
   如果之前有任务已经获得了大量的内存, 那么就只好把数据刷到磁盘
   */
  override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariant()
    assert(numBytes >= 0)
    memoryMode match {
      case MemoryMode.ON_HEAP =>

        /**
         通过释放掉storage抢占到的用于cache的内存, 将Executor内存池容量扩大
         
         当为一个任务申请内存时, 这里需要做大量的尝试性分配
         因为每次尝试申请, 都可能在释放掉内存后,
         接着被storage用来缓存一个大对象
        */
        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
          if (extraMemoryNeeded > 0) {
            // 当前内存池空了, 尝试去抢占storage部分, free的部分可以直接抢占
            //  如果当前状态时storage抢占了executor的内存, 则立即释放它抢占的部分
            val memoryReclaimableFromStorage =
              math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
            if (memoryReclaimableFromStorage > 0) {
              // 申请满足需求的内存
              val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
              storageMemoryPool.decrementPoolSize(spaceToReclaim)
              onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
            }
          }
        }

        /**
           计算当前能用到的最大内存, 很容易理解
           如果storage没有满, 那么就最大内存是尽可能多的抢占
           如果storage已经满了, 按照默认配置, executor可以拿到50%的内存空间
         */
        def computeMaxExecutionPoolSize(): Long = {
          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
        }

        onHeapExecutionMemoryPool.acquireMemory(
          numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)

      case MemoryMode.OFF_HEAP =>
        //  off-heap是由tachyon管理的, 逻辑在独立的项目里
        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    }
  }

申请存储用内存

  override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
    assertInvariant()
    assert(numBytes >= 0)
    if (numBytes > maxStorageMemory) {
      // 这里会报一个错误, fast-fail机制
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxStorageMemory bytes)")
      return false
    }
    if (numBytes > storageMemoryPool.memoryFree) {
      // 当内存不足时, 尝试抢占executor的空闲内存, 并把对应的blocck保存到evctedBlocks里去, 为Executor可能的索取做准备
      val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
      onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
      storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
  
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,303评论 6 531
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,478评论 3 415
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,230评论 0 373
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 62,936评论 1 309
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,688评论 6 409
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,174评论 1 323
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,243评论 3 441
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,402评论 0 288
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 48,932评论 1 334
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,771评论 3 354
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,971评论 1 369
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,514评论 5 359
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,209评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,631评论 0 26
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,863评论 1 283
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,640评论 3 391
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,949评论 2 373

推荐阅读更多精彩内容