Spark内存管理
- 概述:
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。 - 内容:
- 堆内内存:
- 堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置
- Executor 内运行的并发任务共享 JVM 堆内内存
- 根据功能类型又分为存储内存和运行内存
- 流程: GC自动分配与回收
申请内存流程如下:
1. Spark 在代码中 new 一个对象实例;
2. JVM 从堆内内存分配空间,创建对象并返回对象引用;
3. Spark 保存该对象的引用,记录该对象占用的内存。
释放内存流程如下:
1. Spark记录该对象释放的内存,删除该对象的引用;
2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存。
- 堆外内存
- 这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
- 减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能
- 堆外内存可以被精确地申请和释放,且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
- 在默认情况下堆外内存并不启用
- 堆内内存:
内存空间分配
-
静态内存管理:
- 存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置
- Storage内存和Execution内存都有预留空间,目的是防止OOM,因为Spark堆内内存大小的记录是不准确的,需要留出保险区域。
- 堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。
-
统一内存管理:
- 统一内存管理机制与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,
- 统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度
- 规则:
- 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;
- 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
- 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;
- 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
存储内存管理
- RDD持久化机制:在对RDD持久化时,Spark 规定了MEMORY_ONLYMEMORY_AND_DISK等7种不同的存储级别
- RDD缓存过程:
- RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法
- RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间
- 淘汰与落盘:
- 淘汰规则:
- 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存;
- 新旧 Block 不能属于同一个 RDD,避免循环淘汰;
- 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题;
- 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。
- 落盘流程: 如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。
- 淘汰规则:
执行内存管理
概述:
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程-
Shuffle Write
- 若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
- 若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
-
Shuffle Read
- 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
- 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
-
Tungsten:
- Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划(钨丝计划),解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序
- Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。
- Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。
Spark核心组件
数据存储和管理机制
- 数据存储:
- blockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的所有数据都由对应的BlockManager进行管理。
- Driver上有BlockManagerMaster,负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,比如block的增删改等操作,都会在这里维护好元数据的变更。
- 每个节点上都有一个BlockManager,BlockManager中有3个非常重要的组件:
- DiskStore:负责对磁盘数据进行读写;
- MemoryStore:负责对内存数据进行读写;
- BlockTransferService:负责建立BlockManager到远程其他节点的BlockManager的连接,负责对远程其他节点的BlockManager的数据进行读写;
Spark调优
常规性能调优
- 最优资源配置:尽量将任务分配的资源调节到可以使用的资源的最大限度。多多益善,有多少用多少;增加excuter个数,excuter核数,excuter内存
- RDD优化:
1.RDD复用:避免相同的算子和计算逻辑之下对RDD进行重复的计算
2.RDD持久化:持久化将公共RDD的数据缓存到内存/磁盘中,
3.RDD尽可能早做filter: 过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。 - 并行度调节:在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源,task数量应该设置为Spark作业总CPU core数量的2~3倍
- 广播大变量: 对于较大的变量,应该使用广播变量,将task级别的变量广播为excuter级别,减少内存占用
- Kryo序列化:Spark默认使用Java序列化机制,可使用Kryo序列化机制,比Java序列化机制性能提高10倍左右
- 调节本地化等待时长: 数据本地化计算在当前资源不足情况下默认等待3秒,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,如果当前级别仍然不行,那么继续降级。
算子调优
- 使用xxxpartitions替代xxx: 例如使用mappartitions替代map;foreachpartitions替代foreach
- fliter和coalesce配合使用: 我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
- repartition解决SparkSQL低并行度问题: 并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。由于Spark SQL所在stage的并行度无法手动设置,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,
- reduceByKey本地聚合:map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中
Shuffle调优
- 调节map端缓冲区的大小: 如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
- 调节reduce端拉取数据缓冲区大小: shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
- 调节reduce端拉取数据重试次数: reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,默认为3,建议增加重试最大次数,以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。
- 调节reduce端拉取数据等待间隔: reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,默认值为5s,可以通过加大间隔时长,以增加shuffle操作的稳定性。
- 调节sortshuffle排序操作阈值:如果shuffle reduce task的数量(默认200)小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件
JVM调优
- 降低cache操作的内存占比:
- 静态内存管理: Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。
- 统一内存管理:Storage和Execution各占统一内存的50%,无需手动调节
- 调节Excuter堆外内存:Executor的堆外内存主要用于程序的共享库、Perm Space、线程Stack和一些Memory mapping等, 或者类C方式allocate object;Executor堆外内存上限默认大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。
- 调节连接等待时长: Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃
Spark处理数据倾斜
- 聚合元数据
- 过滤导致倾斜的key
- 提高shuffle操作中的reduce并行度
- 使用随机key实现双重聚合
- 将reduce join 转换为map join
- sample采样对倾斜的key单独进行join
- 使用随机数以及扩容进行join
Spark故障解决
- 控制reduce端缓冲大小以避免OOM
- JVM GC导致的shuffle文件拉取失败
- 解决各种序列化导致的报错
- 作为RDD的元素类型的自定义类,必须是可以序列化的;
- 算子函数里可以使用的外部的自定义变量,必须是可以序列化的;
- 不可以在RDD的元素类型、算子函数里使用第三方的不支持序列化的类型,例如Connection。
- 解决算子函数返回NULL导致的问题
- 返回特殊值,不返回NULL,例如“-1”;
- 在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤,将数值为-1的数据给过滤掉;
- 在使用完filter算子后,继续调用coalesce算子进行优化
- 解决YARN-CLIENT模式导致的网卡流量激增问题
- 解决YARN-CLUSTER模式的JVM栈内存溢出无法执行问题: YARN-client模式下,Driver是运行在本地机器上的,Spark使用的JVM的PermGen的配置,是本地机器上的spark-class文件,JVM永久代的大小是128MB,这个是没有问题的,但是在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,使用的是没有经过配置的默认设置,PermGen永久代大小为82MB。
- 解决SparkSQL导致的JVM栈内存溢出: JVM栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了JVM栈深度限制的递归。根据实际的生产环境试验,一条sql语句的or关键字控制在100个以内,通常不会导致JVM栈内存溢出。
- 持久化与checkpoint的使用: