【Spark 精选】SQL 执行计划详解

1.Spark SQL 执行流程

  • Parser 阶段:使用 Antlr4 对 sql 语句进行词法和语法的解析
  • Analyzer 阶段:利用 Catalog 信息将 Unresolved Logical Plan 解析成 Analyzed Logical plan
  • Optimizer 阶段:使用一些规格 Rule 将 Analyzed Logical Plan 解析成 Optimized Logical Plan
  • Planner 阶段:Logical Plan 转换成多个 Physical Plans,可以利用代价模型 cost model 选择最佳的 Physical Plan
image.png

2.案例分析

# stu 表
CREATE TABLE stu (
  id INT,
  name STRING,
  age INT
);
# stu 表中的数据
+------+--------+-------+
|id   |name   | age   |
+------+--------+-------+
|0     |John    |10    |
|1     |Mike    |11    |
|2     |Lisa    |12    |
+------+--------+-------+

# score 表
CREATE TABLE score (
  id INT,
  xueke STRING,
  score INT
);
# score 表中的数据
+------+--------+-------+
|id   |xueke   |score   |
+------+--------+-------+
|0     |Chinese    |80   |
|0     |Math       |100  |
|0     |English    |99   |
|1     |Chinese    |40   |
|1     |Math       |50   |
|1     |English    |60   |
|2     |Chinese    |70   |
|2     |Math       |80   |
|2     |English    |90   |
+------+--------+-------+

# 查看指定 sql 语句的 Parsed Logical Plan、Analyzed Logical Plan、Optimized Logical Plan 和 Physical Plan
EXPLAIN EXTENDED SELECT sum(v), name FROM 
  (SELECT stu.id, 100+10+score.score AS v, name FROM stu JOIN score 
   WHERE stu.id = score.id AND stu.age >= 11) AS tmp 
GROUP BY name;

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
      +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
         +- 'Join Inner
            :- 'UnresolvedRelation [stu], [], false
            +- 'UnresolvedRelation [score], [], false

== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
   +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
      +- Filter ((id#103 = id#106) AND (age#105 >= 11))
         +- Join Inner
            :- SubqueryAlias spark_catalog.default.stu
            :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
            +- SubqueryAlias spark_catalog.default.score
               +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
   +- Join Inner, (id#103 = id#106)
      :- Project [id#103, name#104]
      :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
      :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
      +- Project [id#106, score#108]
         +- Filter isnotnull(id#106)
            +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
   +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
      +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
         +- Project [(110 + score#108) AS v#97, name#104]
            +- SortMergeJoin [id#103], [id#106], Inner
               :- Sort [id#103 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
               :     +- Project [id#103, name#104]
               :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
               :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
               +- Sort [id#106 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                     +- Filter isnotnull(id#106)
                        +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

2.1 Parser 阶段

sql 语句如下所示,经过 Parser 解析后变成抽象语法树即 Parsed Logical Plan,其中叶子节点 UnresolvedRelation 表示该节点还没有被解析,而下一步在 Analyzer 阶段进行处理,并解析这些 UnresolvedRelation 节点。

== Parsed Logical Plan ==
'Aggregate ['name], [unresolvedalias('sum('v), None), 'name]
+- 'SubqueryAlias tmp
   +- 'Project ['stu.id, ((100 + 10) + 'score.score) AS v#97, 'name]
      +- 'Filter (('stu.id = 'score.id) AND ('stu.age >= 11))
         +- 'Join Inner
            :- 'UnresolvedRelation [stu], [], false
            +- 'UnresolvedRelation [score], [], false
image.JPG

上面树中的节点都是 LogicalPlan 类型的,即进行各种操作的 Operator,如下所示是部分 Operator

名称 功能描述
Project(projectList: Seq[NamedExpression], child: LogicalPlan) select 语句输出操作,其中 projectList 是输出对象,每个元素都是一个 expression
Filter(condition: Expression, child: LogicalPlan) 根据 condition 对 child 的输入 rows 进行过滤
Aggregate(groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], child: LogicalPlan) 对 child 输出 rows 进行 aggregate 操作,例如 groupby
Join(left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) left 和 right 的输出结果进行 join 操作
Union(children: Seq[LogicalPlan]) 将 children 计算结果进行 Union 联合
Distinct(child: LogicalPlan) 对 child 输出 rows 取重操作
Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) 对 child 的输出进行 sort 排序
SubqueryAlias(alias: Alias, child: LogicalPlan) 对 child 取别名
GlobalLimit(limitExpr: Expression, child: LogicalPlan) 对 child 输出的数据进行 Limit 限制
Window(windowExpressions: Seq[NamedExpression], partitionSpec: Seq[Expression], orderSpec: Seq[SortOrder], child: LogicalPlan) 输入 child 进行窗口操作,windowExpressions 表示窗口函数的列表,每个窗口函数都是一个 NamedExpression,指定了要计算的聚合操作;partitionSpec 是表达式的列表,用于指定窗口的分区方式;orderSpec 是排序规则的列表,用于指定窗口内行的排序方式

2.2 Analyzer 阶段

分析器会根据上下文和元数据信息,将 UnresolvedRelation 解析成 ResolvedRelation,并自动创建一个 SubqueryAlias 别名的子查询。当分析器解析 Hive 表时,会将 HiveTableRelation 作为 ResolvedRelation 的一部分,其中包含了 Hive 表的元数据信息,如表的名称、列的类型等。因此 UnresolvedRelation 就会自动变成 SubqueryAlias 别名信息,并包含 HiveTableRelation 即 Hive 表的元数据信息。

== Analyzed Logical Plan ==
sum(v): bigint, name: string
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- SubqueryAlias tmp
   +- Project [id#103, ((100 + 10) + score#108) AS v#97, name#104]
      +- Filter ((id#103 = id#106) AND (age#105 >= 11))
         +- Join Inner
            :- SubqueryAlias spark_catalog.default.stu
            :  +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
            +- SubqueryAlias spark_catalog.default.score
               +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]
image.png

2.3 Optimizer 阶段

部分逻辑优化的规则,如下所示。

名称 功能描述 案例
ConstantFolding 对常量表达式进行折叠 select 100+10+a from t 转换为 select 110+a from t
PushDownPredicate 将谓词下推到适当的位置,以尽早过滤掉不必要的行 例如 Filter 操作,select rand(), a from (select * from t) where a>1 转换为 select rand(), a from (select * from t where a>1)
ColumnPruning 去除不需要的列,即删除 child 无用的 output 字段 select a from (select a, b from t) 转换为 select a from (select a from t)
BooleanSimplification 对布尔表达式进行简化,主要是针对 where 语句中的 and/or 组合逻辑 true or a=b 转换为 true
SimplifyCasts 简化类型转换 cast 操作。如果 cast 前后数据类型没有变化,即可以删除 cast 操作 select cast(a as int) from t 转换为 select a from t
SimplifyCaseConversionExpressions 简化大小写转换操作,如果对字符串进行连续多次 Upper/Lower 操作,只需要进行最后一次操作即可 select lower(upper(lower(a))) as c from 转换成 select lower(a) as c from t;
CollapseProject 合并相邻的投影操作,将 Project 与子 Project 或者子 Aggregate 进行合并 select c+1 from (select a+b as c from t) 转换为 select a+b+1 as c+1 from t
CollapseRepartition 合并相邻的 Repartition 重新分区操作 Repartion(numPartitions, shuffle, Repartition(_, _, child)) 转换为 Repartion(numPartitions, shuffle, child)

左图 Analyzed Logical Plan 使用了如下规则,优化成右图 Optimized Logical Plan。

  • 投影消除 RemoveRedundantProjects:左图的 Project 节点中,(110 + score#108) AS v#97 是一个冗余的投影操作。因为没有其他使用 v#97 的表达式,优化器删除这个投影操作。
  • 谓词下推 PushDownPredicate:左图 Filter 节点中,age#105 >= 11 会被下推到右图靠近 HiveTableRelation 的 Filter 节点,age#105 >= 11isnotnull(id#103) 会被下推到右图靠近 HiveTableRelation 的 Filter 节点,以减少数据的读取量。
image.JPG
== Optimized Logical Plan ==
Aggregate [name#104], [sum(v#97) AS sum(v)#110L, name#104]
+- Project [(110 + score#108) AS v#97, name#104]
   +- Join Inner, (id#103 = id#106)
      :- Project [id#103, name#104]
      :  +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
      :     +- HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
      +- Project [id#106, score#108]
         +- Filter isnotnull(id#106)
            +- HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

2.4 Planner 阶段

左图 Optimized Logical Plan 最终化成右图 Physical Plan,如下所示。

  • Aggregate 执行节点:左边执行计划中 Aggregate 节点 sum(v),转换成右边物理计划中 HashAggregate 分组 sum + Exchange hashpartitioning + HashAggregate 的汇总 sum
  • Join Inner 执行节点:左边执行计划中 Join Inner 节点转换成右边物理计划中 SortMergeJoin + Sort + Exchange hashpartitioning
image.jpg
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#104], functions=[sum(v#97)], output=[sum(v)#110L, name#104])
   +- Exchange hashpartitioning(name#104, 200), ENSURE_REQUIREMENTS, [plan_id=248]
      +- HashAggregate(keys=[name#104], functions=[partial_sum(v#97)], output=[name#104, sum#112L])
         +- Project [(110 + score#108) AS v#97, name#104]
            +- SortMergeJoin [id#103], [id#106], Inner
               :- Sort [id#103 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(id#103, 200), ENSURE_REQUIREMENTS, [plan_id=240]
               :     +- Project [id#103, name#104]
               :        +- Filter ((isnotnull(age#105) AND (age#105 >= 11)) AND isnotnull(id#103))
               :           +- Scan hive default.stu [age#105, id#103, name#104], HiveTableRelation [`default`.`stu`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#103, name#104, age#105], Partition Cols: []]
               +- Sort [id#106 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(id#106, 200), ENSURE_REQUIREMENTS, [plan_id=241]
                     +- Filter isnotnull(id#106)
                        +- Scan hive default.score [id#106, score#108], HiveTableRelation [`default`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#106, xueke#107, score#108], Partition Cols: []]

说明:spark-sql 中,join 操作是根据各种条件选择不同的 join 策略,包括 BroadcastHashJoinSortMergeJoinShuffleHashJoin

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容