当我看到Spark3.0版本对于SparkSQL性能优化之后,不由自主的选择去使用SparkSQL,在此分享下SparkSQL3.0新功能。对于SparkSQL提供的两种开发方式:DSL/SQL,我更喜欢SQL方式,SQL方式不仅开发效率高,而且DSL实现特别复杂的功能,个人感觉不如使用SparkCore借助灵活算子实现。
AQE(Adaptive Query Execution)
AQE是Spark SQL中的一种优化技术,它利用运行时统计信息来选择最有效的查询执行计划。AQE默认情况下是禁用的。Spark SQL可以使用spark.sql.adaptive.enabled的伞形配置来控制是否打开/关闭它。 从Spark 3.0开始,AQE具有三个主要功能,包括合并后shuffle分区,sort-merge join转broadcast join以及倾斜优化。
在此有个概念CBO(cost-based optimization)必须先熟悉,CBO框架翻译就是基于成本的优化,也是在Spark3.0版本才引入的,该框架收集并利用各种数据统计信息(如行数,不同值的数量,NULL 值,最大/最小值等)来帮助 Spark 选择更好的计划。这些基于成本的优化技术很好的例子就是选择正确的 Join 类型(broadcast hash join vs. sort merge join),在 hash join 的时候选择正确的连接顺序,或在多个 join 中调整 join 顺序。然而,过时的统计信息和不完善的基数估计可能导致次优查询计划。
1,Coalescing Post Shuffle Partitions
当spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都是true的时候,这个特性会基于map输出统计的数据合并后shuffle分区数。这个特性简化了运行查询时对shuffle分区数的调优,你不需要设置适当的分区数来适配自己的数据集。一旦您通过spark.sql.adaptive.coalescePartitions.initialPartitionNum设置了足够大的初始shuffle分区数,Spark就可以在运行时选择合适的shuffle分区数。
当在 Spark 中运行sql来处理非常大的数据时,shuffle 通常对查询性能有非常重要的影响。Shuffle 是一个昂贵的操作符,因为它需要在网络中移动数据,因此数据是按照下游操作符所要求的方式重新分布的。
shuffle 的一个关键属性是分区的数量。分区的最佳数量取决于数据,但是数据大小可能在不同的阶段、不同的查询之间有很大的差异,这使得这个数字很难调优:如果分区数太少,那么每个分区处理的数据大小可能非常大,处理这些大分区的任务可能需要将数据溢写到磁盘(例如,涉及排序或聚合),从而减慢查询速度;如果分区数太多,那么每个分区处理的数据大小可能非常小,并且将有大量的网络数据获取来读取 shuffle 块,这也会由于低效的 I/O 模式而减慢查询速度。拥有大量的任务也会给 Spark 任务调度程序带来更多的负担。
要解决这个问题,我们可以在开始时设置相对较多的 shuffle 分区数,然后在运行时通过查看 shuffle 文件统计信息将相邻的小分区合并为较大的分区。此种思想就是所谓的Coalescing Post Shuffle Partitions。
2.Converting sort-merge join to broadcast join
当运行时统计的join操作任何一端的数据小于broadcast hash join阈值(spark.sql.autoBroadcastJoinThreshold默认10MB)时,AQE将sort-merge join转换为broadcast hash join。这不如首先规划一个broadcast hash join高效,但这比继续sort-merge join要好,这样我们可以节省join双方的排序,并在本地读取shuffle文件节省网络流量(如果spark.sql.adaptive.localShuffleReader=true)。
3.Optimizing Skew Join
数据倾斜会严重降低连接查询的性能。该特性通过将倾斜任务拆分(并在需要时复制)为大小大致相同的任务来动态处理sort-merge jion的倾斜。当spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置均为true的情况下,动态优化数据倾斜的join才会生效。