1 体系
2 配置
-
资源分配
- num-executors:executor的个数
- executor-cores:cpu core 的两倍
- executor-memory:每个executor的内存大小
- driver-memory:driver的内存大小
-
并行度
- spark.default.parallelism
- spark.sql.partitions
- repartition(num)
-
内存使用
- spark.storage.memoryFraction:用于cache的内存比例
- spark.shuffle.memoryFraction:shffule阶段的缓存占内存比例
3 代码
- 不要重复创建RDD
- 重复使用的RDD进行cache
- 使用高性能算子
- mapPartition代替map
- foreachPartition代替foreach
- 用reduceByKey代替groupByKey
- filter以后使用coalesce减少小任务
- 广播大变量:sc.broadcast
4 数据
- 序列化
- 使用KryoSerializer代替Java序列化
- 文件格式
- 使用parquet文件格式,列式存储,读取效率高
5 倾斜
-
聚合(xxByKey)
-
造成倾斜的Key数量小且不重要
- 抽样+过滤
-
造成倾斜的Key数量多且重要
- 增加并行度
- 局部聚合+全局聚合给每个Key加上前缀,聚合
- 对上步聚合结果的Key去前缀,聚合
-
-
连接
-
小表连接大表
- 将reduce join 转成map join
- 使用广播变量将小表数据进行广播
- SparkSQL设置spark.sql.autoBroadcastJoinThreshold,默认10m
-
大表连接大表
-
造成倾斜的Key不多
- 对RDD1进行sample找出造成倾斜的Key
- 分别对RDD1和RDD2进行filter将其分成skewRDD1和commonRDD1以及skewRDD1和commonRDD2
- 然后对skewRDD1的key添加随机前缀n,对skewRDD2进行n倍扩容,然后join,再对结果的key进行前缀移除得到joinRDD1
- 将commonRDD1和commonRDD2进行连接,得到joinRDD2
joinRDD1.union(joinRDD2)
-
造成倾斜的Key多
- 对RDD1进行随机前缀n的添加
- 对RDD2进行n倍扩容
- 然后进行连接
- 进行随机前缀的移除处理得到结果
-
-