1. RDD 之痛:优化空间受限
RDD 的核心痛点是优化空间有限,它指的是 RDD 高阶算子中封装的函数对于 Spark 来说完全透明,因此 Spark 对于计算逻辑的优化无从下手。相比 RDD,DataFrame 是携带 Schema 的分布式数据集,只能封装结构化数据。DataFrame 的算子大多数都是普通的标量函数,以消费数据列为主。但是,DataFrame 更弱的表示能力和表达能力,反而为 Spark 引擎的内核优化打开了全新的空间。根据 DataFrame 简单的标量算子和明确的 Schema 定义,借助 Catalyst 优化器和 Tungsten,Spark SQL 有能力在运行时,构建起一套端到端的优化机制。这套机制运用启发式的规则与策略和运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能
2. Catalyst逻辑计划

- Catalyst 逻辑优化阶段分为两个环节:逻辑计划解析和逻辑计划优化。在逻辑计划解析中,Catalyst 把“Unresolved Logical Plan”转换为“Analyzed Logical Plan”;在逻辑计划优化中,Catalyst 基于一些既定的启发式规则(Heuristics Based Rules),把“Analyzed Logical Plan”转换为“Optimized Logical Plan”。
- 在逻辑计划解析环节,Catalyst 就是要结合 DataFrame 的 Schema 信息,来确认计划中的表名、字段名、字段类型与实际数据是否一致。完成确认之后,Catalyst 会生成“Analyzed Logical Plan”。
- 生成“Analyzed Logical Plan”之后,Catalyst 并不会止步于此,它会基于一套启发式的规则,把“Analyzed Logical Plan”转换为“Optimized Logical Plan”
Catalyst 的优化规则
- 谓词下推(Predicate Pushdown):“下推”指代的是把这些谓词沿着执行计划向下,推到离数据源最近的地方,从而在源头就减少数据扫描量
- 列剪裁(Column Pruning):列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段(列式存储才行)
- 常量替换 (Constant Folding):掺杂了一些常量表达式,Catalyst 也会自动地用表达式的结果进行替换
Cache Manager 优化
这里的 Cache 指的就是我们常说的分布式数据缓存。想要对数据进行缓存,你可以调用 DataFrame 的.cache 或.persist,或是在 SQL 语句中使用“cache table”关键字。Cache Manager 其实很简单,它的主要职责是维护与缓存有关的信息。具体来说,Cache Manager 维护了一个 Mapping 映射字典,字典的 Key 是逻辑计划,Value 是对应的 Cache 元信息。当 Catalyst 尝试对逻辑计划做优化时,会先尝试对 Cache Manager 查找,看看当前的逻辑计划或是逻辑计划分支,是否已经被记录在 Cache Manager 的字典里。如果在字典中可以查到当前计划或是分支,Catalyst 就用 InMemoryRelation 节点来替换整个计划或是计划的一部分,从而充分利用已有的缓存数据做优化。
3. Catalyst物理计划
优化 Spark Plan

-
Catalyst 都有哪些 Join 策略?
结合 Joins 的实现机制和数据的分发方式,Catalyst 在运行时总共支持 5 种 Join 策略,分别是 Broadcast Hash Join(BHJ)、Shuffle Sort Merge Join(SMJ)、Shuffle Hash Join(SHJ)、Broadcast Nested Loop Join(BNLJ)和 Shuffle Cartesian Product Join(CPJ)。
5种Join策略及其含义
如果开发者不满足于 JoinSelection 默认的选择顺序,也就是 BHJ > SMJ > SHJ > BNLJ > CPJ,还可以通过在 SQL 或是 DSL 语句中引入 Join hints,来明确地指定 Join 策略,从而把自己的意愿凌驾于 Catalyst 之上。不过,需要我们注意的是,要想让指定的 Join 策略在运行时生效,查询语句也必须要满足其先决条件才行。
5种Join策略的先决条件
生成 Physical Plan
从 Spark Plan 到 Physical Plan 的转换,需要几组叫做 Preparation Rules 的规则。这些规则坚守最后一班岗,负责生成 Physical Plan。那么,这些规则都是什么,它们都做了哪些事情呢?

4. 钨丝计划:Tungsten(重要)
Unsafe Row:二进制数据结构
基于内存页的内存管理
如何理解 WSCG?
什么是火山迭代模型?
WSCG 的优势是什么?
WSCG 是如何在运行时动态生成代码的?
5. AQE的3个特性
为什么需要 AQE?
- 启发式的优化又叫 RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。也就是说,启发式的优化实际上算是一种经验主义。
- CBO 的特点是“实事求是”,基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。如果在运行时数据分布发生动态变化,CBO 先前制定的执行计划并不会跟着调整、适配。
AQE是什么?
AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。
- AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。每个 Map Task 都会输出以 data 为后缀的数据文件,还有以 index 为结尾的索引文件,这些文件统称为中间文件。每个 data 文件的大小、空文件数量与占比、每个 Reduce Task 对应的分区大小,所有这些基于中间文件的统计值构成了 AQE 进行优化的信息来源
-
结合 Spark SQL 端到端优化流程图我们可以看到,AQE 从运行时获取统计信息,在条件允许的情况下,优化决策会分别作用到逻辑计划和物理计划。
AQE 既定的规则和策略
Join 策略调整
DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Joins 降级为 Broadcast Joins。需要注意的是,这个规则仅适用于 Shuffle Sort Merge Join 这种关联机制,其他机制如 Shuffle Hash Join、Shuffle Nested Loop Join 都不支持。
- spark.sql.autoBroadcastJoinThreshold 中间文件尺寸总和小于广播阈值
- spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 空文件占比小于配置项
自动分区合并
在 Reduce 阶段,当 Reduce Task 从全网把数据分片拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起
- spark.sql.adaptive.advisoryPartitionSizeInBytes,由开发者指定分区合并后的推荐尺寸。
- spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后,分区数不能低于该值。
自动倾斜处理
在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。
- spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的膨胀系数
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
- spark.sql.adaptive.advisoryPartitionSizeInBytes,以字节为单位,定义拆分粒度
6. DPP特性
分区剪裁
相比于谓词下推,分区剪裁往往能更好地提升磁盘访问的 I/O 效率。谓词下推操作往往是根据文件注脚中的统计信息完成对文件的过滤,过滤效果取决于文件中内容的“纯度”。分区剪裁则不同,它的分区表可以把包含不同内容的文件,隔离到不同的文件系统目录下。这样一来,包含分区键的过滤条件能够以文件系统目录为粒度对磁盘文件进行过滤,从而大幅提升磁盘访问的 I/O 效率。
动态分区剪裁
动态分区剪裁运作的背后逻辑,是把维度表中的过滤条件,通过关联关系传导到事实表,来完成事实表的优化。在数据关联的场景中,开发者要想利用好动态分区剪裁特性,需要注意 3 点:
- 事实表必须是分区表,并且分区字段必须包含 Join Key
- 动态分区剪裁只支持等值 Joins,不支持大于、小于这种不等值关联关系
- 维度表过滤之后的数据集,必须要小于广播阈值,注意调整配置项 spark.sql.autoBroadcastJoinThreshold
7. Join Hints
NLJ 的工作原理(NLJ 算法的计算复杂度是 O(M * N))
NLJ 是采用“嵌套循环”的方式来实现关联的。也就是说,NLJ 会使用内、外两个嵌套的 for 循环依次扫描外表和内表中的数据记录,判断关联条件是否满足
SMJ 的工作原理(SMJ 算法的计算复杂度为 O(M + N))
SMJ 的思路是先排序、再归并。具体来说,就是参与 Join 的两张表先分别按照 Join Key 做升序排序。然后,SMJ 会使用两个独立的游标对排好序的两张表完成归并关联。
- 外表 Join Key 等于内表 Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录
- 外表 Join Key 小于内表 Join Key,不满足关联条件,把外表的游标滑动到下一条记录
- 外表 Join Key 大于内表 Join Key,不满足关联条件,把内表的游标滑动到下一条记录
HJ 的工作原理(内表扫描的计算复杂度为O(1))
- HJ 的计算分为两个阶段,分别是 Build 阶段和 Probe 阶段。在 Build 阶段,基于内表,算法使用既定的哈希函数构建哈希表
-
在 Probe 阶段,算法遍历每一条数据记录,先是使用同样的哈希函数,以动态的方式(On The Fly)计算 Join Key 的哈希值。然后,用计算得到的哈希值去查询刚刚在 Build 阶段创建好的哈希表。如果查询失败,说明该条记录与维度表中的数据不存在关联关系;如果查询成功,则继续对比两边的 Join Key。如果 Join Key 一致,就把两边的记录进行拼接并输出,从而完成数据关联。
小结
分布式环境下的 Join

Spark 如何选择 Join 策略?
- 在等值数据关联中,Spark 会尝试按照 BHJ > SMJ > SHJ 的顺序依次选择 Join 策略。在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。其中,SMJ 和 SHJ 策略支持所有连接类型,如全连接、Anti Join 等等。BHJ 尽管效率最高,但是有两个前提条件:一是连接类型不能是全连接(Full Outer Join);二是基表要足够小,可以放到广播变量里面去。
- SHJ 策略要想被选中必须要满足两个先决条件,这两个条件都是对数据尺寸的要求。首先,外表大小至少是内表的 3 倍。其次,内表数据分片的平均大小要小于广播变量阈值。第一个条件的动机很好理解,只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。
- 由于不等值 Join 只能使用 NLJ 来实现,因此 Spark SQL 可选的 Join 策略只剩下 BNLJ 和 CPJ。在同一种计算模式下,相比 Shuffle,广播的网络开销更小。显然,在两种策略的选择上,Spark SQL 一定会按照 BNLJ > CPJ 的顺序进行尝试。当然,BNLJ 生效的前提自然是内表小到可以放进广播变量。如果这个条件不成立,那么 Spark SQL 只好委曲求全,使用笨重的 CPJ 策略去完成关联计算。

8. 大表Join小表
案例 1:Join Key 远大于 Payload
先用 Hash Key 取代 Join Keys,再清除内表冗余数据。Hash Key 实际上是 Join Keys 拼接之后的哈希值。既然存在哈希运算,我们就必须要考虑哈希冲突的问题。
案例 2:过滤条件的 Selectivity 较高
AQE 允许 Spark SQL 在运行时动态地调整 Join 策略。我们刚好可以利用这个特性,把最初制定的 SMJ 策略转化为 BHJ 策略
案例 3:小表数据分布均匀
当参与 Join 的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ 往往比 SMJ 的执行效率更高。这种情况下,我们不妨使用 Join Hints 来强制 Spark SQL 去选择 SHJ 策略进行关联计算
9. 大表Join大表
如何理解“分而治之”?
先把一个复杂任务拆解成多个简单任务,再合并多个简单任务的计算结果。首先,我们要根据两张表的尺寸大小区分出外表和内表。一般来说,内表是尺寸较小的那一方。然后,我们人为地在内表上添加过滤条件,把内表划分为多个不重复的完整子集。接着,我们让外表依次与这些子集做关联,得到部分计算结果。最后,再用 Union 操作把所有的部分结果合并到一起,得到完整的计算结果,这就是端到端的关联计算。
如何保证内表拆分的粒度足够细?
“分而治之”中一个关键的环节就是内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量。拆分的关键在于拆分列的选取,为了让子表足够小,拆分列的基数(Cardinality)要足够大才行。
如何避免外表的重复扫描?
对于外表参与的每一个子关联,在逻辑上,我们完全可以只扫描那些与内表子表相关的外表数据,并不需要每次都扫描外表的全量数据。
数据分布均匀
- 两张表数据分布均匀。
- 内表所有数据分片,能够完全放入内存。
以 Task 为粒度解决数据倾斜
有了 AQE 的自动倾斜处理特性,在应对数据倾斜问题的时候,我们确实能够大幅节省开发成本。不过,天下没有免费的午餐,AQE 的倾斜处理是以 Task 为粒度的,这意味着原本 Executors 之间的负载倾斜并没有得到根本改善
以 Executor 为粒度解决数据倾斜
“两阶段 Shuffle”
如何理解“两阶段 Shuffle”?
“两阶段 Shuffle”指的是,通过“加盐、Shuffle、关联、聚合”与“去盐化、Shuffle、聚合”这两个阶段的计算过程,在不破坏原有关联关系的前提下,在集群范围内以 Executors 为粒度平衡计算负载 。
10. 补充
- Java Object 在对象存储上为什么会有比较大的开销?JVM 需要多少个字节才能存下字符串“abcd”? https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
- 对于复杂的业务逻辑,如果DSL和SQL都无法实现,除了UDF,其实还可以考虑用Script Transformation https://databricks.com/session_eu19/powering-custom-apps-at-facebook-using-spark-script-transformation
- 标量函数和标量算子
- 3 种 Join 实现方式和 2 种网络分发模式,明明应该有 6 种 Join 策略,为什么 Catalyst 没有支持 Broadcast Sort Merge Join 策略?
- 一个action对应一个job,但是经常一个action被拆分成了两三个相同的job(task数量可能会有不同),并且有时候好多job还是可以skip的。这又涉及了Spark的哪些优化机制呢?
- Block Nested-Loop Join(BNL) ,非等值连接?
- EnsureRequirements 规则?
- 针对排序操作,你认为 Tungsten 在数据结构方面有哪些改进呢?
- 你认为表达式代码生成(Expression Codegen)和全阶段代码生成(Whole Stage Codegen)有什么区别和联系呢?
- 如果事实表的join字段有好几个,其中只有一个是分区字段,那还能享受到DPP吗?
- DPP 的机制就是将经过过滤后的维度表广播到事实表进行裁剪,减少扫描数据,但 Spark 怎么才知道哪个是维度表哪个是事实表?
- 不等值 Join 可以强行用 Sort Merge Join 和 Hash Join 两种机制来实现吗?排序和构建hash table本身成了开销,在不等值的情况下,已经没有任何意义。
- 对于案例 1,我们的核心思路是用哈希值来替代超长的 Join Keys,除了用哈希值以外,你觉得还有其他的思路或是办法,去用较短的字符串来取代超长的 Join Keys 吗?
- 对于案例 3,假设 20GB 的小表存在数据倾斜,强行把 SMJ 转化为 SHJ 会抛 OOM 异常。这个时候,你认为还有可能继续优化吗?
- DPP机制的触发条件非常苛刻,怎么结合业务来使用DPP机制带来的优化好处,例如在业务中经常使用ID作为join 的条件,显然是不能作为分区的,那么是否可以取id的尾值等等的思路吧
- 两阶段 Shuffle,理解加盐解决了什么问题



