Spark-Core 计算基础核心(六) 概念及原理介绍

Spark内存管理

  • 概述:
    作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
  • 内容:
    1. 堆内内存:
      1. 堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置
      2. Executor 内运行的并发任务共享 JVM 堆内内存
      3. 根据功能类型又分为存储内存和运行内存
      4. 流程: GC自动分配与回收
        申请内存流程如下:
        1. Spark 在代码中 new 一个对象实例;
        2. JVM 从堆内内存分配空间,创建对象并返回对象引用;
        3. Spark 保存该对象的引用,记录该对象占用的内存。
        释放内存流程如下:
        1. Spark记录该对象释放的内存,删除该对象的引用;
        2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存。
    2. 堆外内存
      1. 这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
      2. 减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能
      3. 堆外内存可以被精确地申请和释放,且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
      4. 在默认情况下堆外内存并不启用

内存空间分配

  • 静态内存管理:

    1. 存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置
    2. Storage内存和Execution内存都有预留空间,目的是防止OOM,因为Spark堆内内存大小的记录是不准确的,需要留出保险区域。
    3. 堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。
  • 统一内存管理:

    1. 统一内存管理机制与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,
    2. 统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度
    3. 规则:
      1. 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;
      2. 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
      3. 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;
      4. 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

存储内存管理

  1. RDD持久化机制:在对RDD持久化时,Spark 规定了MEMORY_ONLYMEMORY_AND_DISK等7种不同的存储级别
  2. RDD缓存过程:
    1. RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法
    2. RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间
  3. 淘汰与落盘:
    1. 淘汰规则:
      1. 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存;
      2. 新旧 Block 不能属于同一个 RDD,避免循环淘汰;
      3. 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题;
      4. 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。
    2. 落盘流程: 如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

执行内存管理

  • 概述:
    执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程

  • Shuffle Write

    1. 若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
    2. 若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
  • Shuffle Read

    1. 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
    2. 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
  • Tungsten:

    1. Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划(钨丝计划),解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序
    2. Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。
    3. Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。

Spark核心组件

数据存储和管理机制

  • 数据存储:
    1. blockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。
    2. Driver上有BlockManagerMaster,负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护好元数据的变更。
    3. 每个节点上都有一个BlockManager,BlockManager中有3个非常重要的组件:
      1. DiskStore:负责对磁盘数据进行读写;
      2. MemoryStore:负责对内存数据进行读写;
      3. BlockTransferService:负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写;

Spark调优

常规性能调优

  1. 最优资源配置:尽量将任务分配的资源调节到可以使用的资源的最大限度。多多益善,有多少用多少;增加excuter个数,excuter核数,excuter内存
  2. RDD优化:
    1.RDD复用:避免相同的算子和计算逻辑之下对RDD进行重复的计算
    2.RDD持久化:持久化将公共RDD的数据缓存到内存/磁盘中,
    3.RDD尽可能早做filter: 过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。
  3. 并行度调节:在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源,task数量应该设置为Spark作业总CPU core数量的2~3倍
  4. 广播大变量: 对于较大的变量,应该使用广播变量,将task级别的变量广播为excuter级别,减少内存占用
  5. Kryo序列化:Spark默认使用Java序列化机制,可使用Kryo序列化机制,比Java序列化机制性能提高10倍左右
  6. 调节本地化等待时长: 数据本地化计算在当前资源不足情况下默认等待3秒,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,如果当前级别仍然不行,那么继续降级。

算子调优

  1. 使用xxxpartitions替代xxx: 例如使用mappartitions替代map;foreachpartitions替代foreach
  2. fliter和coalesce配合使用: 我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
  3. repartition解决SparkSQL低并行度问题: 并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。由于Spark SQL所在stage的并行度无法手动设置,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,
  4. reduceByKey本地聚合:map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中

Shuffle调优

  1. 调节map端缓冲区的大小: 如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
  2. 调节reduce端拉取数据缓冲区大小: shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
  3. 调节reduce端拉取数据重试次数: reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,默认为3,建议增加重试最大次数,以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。
  4. 调节reduce端拉取数据等待间隔: reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,默认值为5s,可以通过加大间隔时长,以增加shuffle操作的稳定性。
  5. 调节sortshuffle排序操作阈值:如果shuffle reduce task的数量(默认200)小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件

JVM调优

  1. 降低cache操作的内存占比:
    1. 静态内存管理: Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。
    2. 统一内存管理:Storage和Execution各占统一内存的50%,无需手动调节
  2. 调节Excuter堆外内存:Executor的堆外内存主要用于程序的共享库、Perm Space、线程Stack和一些Memory mapping等, 或者类C方式allocate object;Executor堆外内存上限默认大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。
  3. 调节连接等待时长: Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃

Spark处理数据倾斜

  1. 聚合元数据
  2. 过滤导致倾斜的key
  3. 提高shuffle操作中的reduce并行度
  4. 使用随机key实现双重聚合
  5. 将reduce join 转换为map join
  6. sample采样对倾斜的key单独进行join
  7. 使用随机数以及扩容进行join

Spark故障解决

  1. 控制reduce端缓冲大小以避免OOM
  2. JVM GC导致的shuffle文件拉取失败
  3. 解决各种序列化导致的报错
    1. 作为RDD的元素类型的自定义类,必须是可以序列化的;
    2. 算子函数里可以使用的外部的自定义变量,必须是可以序列化的;
    3. 不可以在RDD的元素类型、算子函数里使用第三方的不支持序列化的类型,例如Connection。
  4. 解决算子函数返回NULL导致的问题
    1. 返回特殊值,不返回NULL,例如“-1”;
    2. 在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤,将数值为-1的数据给过滤掉;
    3. 在使用完filter算子后,继续调用coalesce算子进行优化
  5. 解决YARN-CLIENT模式导致的网卡流量激增问题
  6. 解决YARN-CLUSTER模式的JVM栈内存溢出无法执行问题: YARN-client模式下,Driver是运行在本地机器上的,Spark使用的JVM的PermGen的配置,是本地机器上的spark-class文件,JVM永久代的大小是128MB,这个是没有问题的,但是在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,使用的是没有经过配置的默认设置,PermGen永久代大小为82MB。
  7. 解决SparkSQL导致的JVM栈内存溢出: JVM栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了JVM栈深度限制的递归。根据实际的生产环境试验,一条sql语句的or关键字控制在100个以内,通常不会导致JVM栈内存溢出。
  8. 持久化与checkpoint的使用:
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容