1. 开发调优
1.1 避免创建重复的RDD
在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD。
对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。
1.2 尽可能复用同一个RDD
多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。
//rdd2中的数据仅仅是rdd1中的value值
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)
// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
//应该改为:
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.mapValue(...)
1.3 对多次使用的RDD进行持久化
Spark中对于一个RDD执行多次算子(函数操作)的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。
因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。
当存在多个 Action 操作或者依赖于多个 RDD 的时候, 可以在那之前缓存RDD. 如下:
val rdd = sc.textFile("path/to/file").Map(...).filter(...)
val rdd1 = rdd.Map(x => x+1)
val rdd2 = rdd.Map(x => x+100)
val rdd3 = rdd1.join(rdd2)
rdd3.count()
在这里 有2个 RDD 依赖于 rdd, 会形成如下的 DAG 图:
所以可以在 rdd 生成之后使用 cache 函数对 rdd 进行缓存,这次就不用再从头开始计算了.缓存之后过程如下:
除了 cache 函数外,缓存还可以使用 persist, cache 是使用的默认缓存选项,一般默认为Memory_only(内存中缓存), persist 则可以在缓存的时候选择任意一种缓存类型.事实上, cache 内部调用的是默认的 persist.
持久化的类型 如下:
持久化级别 | 含义解释 |
---|---|
MEMORY_ONLY | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。 |
MEMORY_AND_DISK | 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
MEMORY_ONLY_SER | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
MEMORY_AND_DISK_SER | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
是否进行序列化和磁盘写入, 需要充分考虑所分配到的内存资源和可接受的计算时间长短, 序列化会减少内存占用,但是反序列化会延长时间, 磁盘写入会延长时间, 但是会减少内存占用, 也许能提高计算速度.此外要认识到: cache 的 RDD 会一直占用内存, 当后期不需要再依赖于他的反复计算的时候, 可以使用 unpersist 释放掉.
1.4 尽量避免使用shuffle类算子
Spark作业运行过程中,最消耗性能的地方就是shuffle过程。
将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如repartition、**ByKey、join等算子,都会触发shuffle操作。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
1.5 使用map-side预聚合的shuffle操作
所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量。
建议使用 reduceByKey 或者 aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
1.6 使用高性能的算子
1.6.1 使用reduceByKey/aggregateByKey替代groupByKey
详情见 “使用map-side预聚合的shuffle操作”
1.6.2 使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据。
所以使用这类操作时要慎重!
1.6.3 使用foreachPartitions替代foreach
原理类似于“使用mapPartitions替代map”
1.6.4 使用filter之后进行coalesce操作
通常对一个RDD执行 filte r算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce(重分区)算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。此时如果
照常 进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费。在某些场景下,对于性能的提升会有一定的帮助。
1.6.5 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
1.7 广播大变量
在算子函数中使用到外部变量时,默认情况下,每个task都有一个变量副本。如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
1.8 使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化:
- 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
- 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
- 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们可以使用 Kryo 序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,但是Spark同时支持使用Kryo序列化库。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。
以下是使用Kryo的代码示例:
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
1.9 优化数据结构
Java中,有三种类型比较耗费内存:
- 对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
- 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
- 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
因此Spark官方建议,在Spark编码实现中,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
但是要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。因此建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
2. 资源调优
在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。
资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。
因此我们必须对Spark作业的资源使用原理有一个清晰的认识,以及如何设置合适的参数值。
2.1 Spark作业基本运行原理
详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。
在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。
以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。
2.2 资源参数调优
谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。
参数 | 说明 | 调优建议 |
---|---|---|
num-Executors | 该参数用于设置Spark作业总共要用多少个Executor进程来执行。这个参数非常重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。 | 每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。 |
Executor-memory | 该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。 | 每个Executor进程的内存设置 4G~8G 较为合适。具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-Executors乘以Executor-memory,就代表了你的Spark作业申请到的总内存量。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。 |
Executor-cores | 用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor并行执行Task线程的能力。每个core同一时间只能执行一个Task线程,因此每个Executor的core越多,越能够快速地执行完分配给自己的所有Task线程。 | Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-Executors * Executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适 |
driver-memory | 该参数用于设置Driver进程的内存。 | ** Driver的内存通常来说不设置,或者设置1G左右应该就够了**。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。 |
spark.default. parallelism | 该参数用于设置每个stage的默认Task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。 | Spark作业的默认Task数量为500~1000个较合适。如果不去设置这个参数,那么就会导致Spark自己根据底层HDFS的block数量来设置Task的数量,默认是一个HDFS block对应一个Task。通常来说,Spark默认设置的数量是偏少的(比如几十个Task),如果Task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。即无论你的Executor进程/内存/CPU有多大,但是Task只有几个,那么90%的Executor进程可能根本就没有Task执行,也就白白浪费了资源此Spark官网建议的设置原则是,设置该参数为num-Executors * Executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个Task是可以的,可以充分地利用Spark集群的资源。 |
spark.storage.memoryFraction | 该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。 | 如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过Spark web ui可以观察到作业的gc耗时),意味着Task执行用户代码的内存不够用,那么同样建议调低这个参数的值。 |
spark.shuffle.memoryFraction | 该参数用于设置shuffle过程中一个Task拉取到上个stage的Task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认20%。shuffle操作在进行聚合时,如果使用的内存超出20%的限制,多余的数据就会溢写到磁盘,此时会极大地降低性能。 | 如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着Task执行用户代码的内存不够用,那么同样建议调低这个参数的值。 |
资源参数的调优,没有一个固定的值,需要根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及Spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。
以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
在下篇 Spark 性能优化 高级篇中,我们会详细讲解数据倾斜调优以及Shuffle调优。
其他
单个Executor内存设置过大,可能导致严重的gc延迟,甚至hang住,64G是一个经验性的上限值
HDFS client的并发性较差,涉及到HDFS读取写入的操作,每个Executor上的task数不易超过5,即- -executor-cores小于等于5。
外部存储,如HDFS,考虑使用Avro、Parquet、 Thrift、 Protobuf等格式