1.spark_shuffle_分区数
spark_sql aqe 优化
SparkSQL中,基于SQL分析或者DSL分析,执行Job时,如果产生Shuffle,默认分区数:200
【实际生产环境中必须合理设置shuffle时分区数目】
> spark.sql.shuffle.partitions
【代码中设置】
spark = SparkSession.builder \
.appName("RetailAnalysis") \
.master("local[4]") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
注意:
在Spark 3之前,每个应用都需要通过测试,合理设置分区数目。
【从Spark 3 开始,SparkSQL底层实现自适应调整性能优化】
当Job运行时,自动依据Shuffle时数据量,合理设置Shuffle时分区数。
【官方文档】
https://spark.apache.org/docs/3.1.2/sql-performance-tuning.html#coalescing-post-shuffle-partitions
【参数】
参数1:表示是否启用自适应调整机制
spark.sql.adaptive.enabled=true
参数2:表示Shuffle后分区数目
spark.sql.adaptive.coalescePartitions.enabled=true
其他参数:
最小分区数目
spark.sql.adaptive.coalescePartitions.minPartitionNum
初始化分区数目
spark.sql.adaptive.coalescePartitions.initialPartitionNum
每个分区数据处理数据量
spark.sql.adaptive.advisoryPartitionSizeInBytes=64m
2_spark项目调优方案
问题: 问题:Hive、Spark做了哪些优化?/ 项目中做了哪些优化?
我将项目内的spark调优分为两个
pyspark 开发调优
1.使用高性能的rdd算子
如 mappartition 和 foreachpartition
2.使用窄依赖算子替代宽依赖的算子
如 降低分区 使用coalesce代替repartition
3.适当增加和减少分区
数据量大,分区少,调大分区:读取数据
数量量小,分区多,降低分区:聚合之后
4.适当使用 缓存机制 及时释放内存
RDD或者DataFrame或者TmpView使用多次:先缓存persist
后续代码中不再需要时,就释放缓存
5.尽量使用map端聚合的算子
列如 reducebykey agg..bykey 代替 groupbykey
6.当大表join小表时 使用广播join
-
Sql优化 遵循谓词下推思想
将需要过滤的数据或者无用的数据在进行复杂操作之前提前进行过滤
列如
需要过滤:where/having:能用where就不用having ,where是分组前 having 是分组后,where前不能加聚合,having 后边可以加聚合
row_nubmer实现去重:减少计算的数据量
举例
无用数据:订单表1000万,商品表100万,需求将两张表进行join
假设已经 卖出去60w 种商品
- 订单表:tb_order:oid、pid
- 商品表:tb_goods:pid、pname
分析 商品表内的一百万条数据 在订单表中不一定全有, 只有卖出去的部分才需要join
所以 1_找出所有已经卖出的商品单号 表A 60w
select distinct pid from tb_order
2_筛选出卖出商品的详细信息 表B 60w
select
pid,pname
from tb_goods B join A on A.pid = B.pid
3_step3:用订单表和B进行关联
select
a.oid,
a.pid,
B.pname
from tb_order a
join B on a.pid = B.pid
-
结构优化 文件存储类型、分区结构化
分区表:采用静态分区
select count(*) from table1 where dt = '2021-11-09'; --走分区裁剪过滤查询
-- 不支持join中动态分区裁剪
select
*
from table1 a join table2 b on a.dt = b.dt
and a.dt = '2021-11-09' and b.dt = '2021-11-09';
- 合适文件类型:parquet、orc【索引】
- 合适压缩类型:snappy、lz4、lzo
Spark 3.0新特性
-
动态分区裁剪(Dynamic Partition Pruning)
默认的分区裁剪只有在单表查询过滤时才有效
-
开启动态分区裁剪:自动在Join时对两边表的数据根据条件进行查询过滤,将过滤后的结果再进行join
spark.sql.optimizer.dynamicPartitionPruning.enabled=true
开启动态分区裁剪之后,过滤条件都会先执行 不论是on 还是 where 还是 主副表的条件都会执行
-
自适应查询执行(Adaptive Query Execution)
基于CBO优化器引擎:实现最小代价的数据处理
自动根据统计信息设置Reducer【ShuffleRead】的数量来避免内存和I/O资源的浪费
自动选择更优的join策略来提高连接查询性能
-
自动优化join数据来避免不平衡查询造成的数据倾斜,将数据倾斜的数据自动重分区
spark.sql.adaptive.enabled=true
cbo 将 catalyst 优化器优化后的逻辑计划 转换为物理计划
-
加速器感知调度(Accelerator-aware Scheduling)(不重要)
- 添加原生的 GPU 调度支持,该方案填补了 Spark 在 GPU 资源的任务调度方面的空白
- 有机地融合了大数据处理和 AI 应用,扩展了 Spark 在深度学习、信号处理和各大数据应用的应用场景
项目总结问题
问题1: 数据导出不满足原子性
场景:使用sqoop将报表数据导出到mysql中时,有一部分map task 失败了,导致数据只导出了一部分
需求,我们希望如果 要失败都失败,mysql里面一点数据也没有,要成功都成功
解决 :在sqoop导入的时候添加一个参数 --staging-table
原理 :sqoop将这张表的数据导入到一张临时表中,如果成功了,就将数据从临时表中放到目标表中,如果失败了就删除目标表
问题2 数据量不一致的情况
场景 hive表中的数据量于数据源oracle中的表数据量不一致
原因, sqoop在导入hive中 如果以textfile格式存储的话,会默认将\t\n等特殊字符当作 换行符生成文本
解决 ,不以textfile格式进行存储 (avro orc格式)
问题3 spark的数据倾斜
现象 :部分task 执行时间过长
定位:从4040监控中看到某个Stage中的Task运行时间远高于别的Task
rdd.groupByKey.reduceByKey.sortByKey.join
解决
提高Shuffle过程中的并行度:增加分区个数
选用带有分区内聚合的算子:带有Map端聚合的算子,减少Shuffle Read的数据量
将小表数据进行广播,实现广播Join:类似于Map Join
采样抽取倾斜的数据,单独Join,最后Union合并:类似于Hive中Skew Join
增加随机前缀:增加随机的前缀,实现随机分区
自定义分区器:默认的是HashPartition
小表扩大N倍,大表增加1 ~ N的随机前缀再做Join
问题4小文件问题
- 每个Task会产生一个结果文件
- Task个数根据分区个数来决定
- 分区多,每个分区的数据少
- 调整分区个数:coalesce
问题4:ThriftServer资源不足,GC问题
本质 ThriftServer本身是一个spark 应用 项目中所有SQL计算都是提交给ThriftServer,计算结果都会返回给 Driver
会导致 Driver 内存不足
解决 给Driver 分配更多的内存
start-thriftserver.sh \--name sparksql-thrift-server \--master yarn \--deploy-mode client \--driver-memory 1g \--hiveconf hive.server2.thrift.http.port=10001 \--num-executors 3 \--executor-memory 1g \--conf spark.sql.shuffle.partitions=2
问题5:ThriftServer单点故障问题