一.Spark 性能优化概述
首先笔者能力优先,使用Spark有一段时间,如下是笔者的工作经验的总结。
Spark任务运行图:
Spark的优化思路:
一般是从3个层面进行Spark程序的优化:
- 运行环境优化
- RDD算子优化
- 参数微调
二.运行环境优化
2.1 数据本地性
我们知道HDFS的数据文件存储在不同的datanode,一般数据副本数量是3,因为Spark计算的数据量比较大,如果数据不在本节点,需要通过网络去其它的datanode读取数据。
所以此时我们可以通过提高数据本地性,减少网络传输,来达到性能优化的目的。
- 计算和存储同节点(executor和HDFS的datanode、hbase的region server同节点)
- executor数目合适: 如果100个数据界定,3个计算节点,就有97份网络传递,所以此种情况可以适当增加计算节点。
- 适当增加数据副本数量
2.2 数据存储格式
推荐使用列式存储格式: parquet.
parquet存在如下优先:
- 相同数据类型的数据有很高压缩比
- Hive主要支持OCR、也支持parquet
三.RDD算子优化
3.1 尽可能复用同一个RDD
每创建一个RDD都会带来性能的开销,尽可能的对同一个RDD做算子操作,而不要频繁创建新的
RDD。
3.2 对多次使用的RDD进行持久化
如果RDD的算子特别多,需要频繁多次操作同一个RDD,最好的办法是将该RDD进行持久化,
四.参数微调
num-executors
参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。executor-cores
参数说明:该参数用于设置每个Executor进程的CPU core数量。driver-memory
参数说明:该参数用于设置Driver进程的内存。spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。
资源参数参考示例:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
五.数据倾斜
绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在1分钟之内执行完了,但是剩余两三个task却要一两个小时。这种情况很常见。
数据倾斜图例:
解决数据倾斜一般有如下几种常用方法:
使用Hive ETL预处理数据
先使用Hive进行预处理数据,也就是使用Hive先计算一层中间数据,Spark从中间层数据开始计算。过滤少数导致倾斜的key
如果发生导致倾斜的key非常少,可以将Spark任务拆分为包含 导致倾斜的key的任务和不包含key的任务。sample采样倾斜key单独进行join
通过采样,提前预估会发生数据倾斜的key,然后将一个join拆分为两个join,其中一个不包含该key,一个只包含该key,最后将结果集进行union。调整并行度
调整Shuffle并行度,数据打散广播小数据集
适用于一个大表,一个小表
不用join连接操作,而改用Broadcast变量与map模拟join操作,完全规避shuffle操作
spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600增加随机前缀
对发生倾斜的RDD增加随机前缀
对另外一个RDD等量扩容
如果少量的key发生倾斜,可以先过滤出一个单独的RDD,对另外一个RDD同理吹,join之后再合并