Spark SQL Catalyst优化器

记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下,

0. Overview
1. Catalyst工作流程
2. Parser模块
3. Analyzer模块
4. Optimizer模块
5. SparkPlanner模块
6. Job UI
7. Reference

Overview

Spark SQL的核心是Catalyst优化器,是以一种新颖的方式利用Scala的的模式匹配和quasiquotes机制来构建的可扩展查询优化器。

sparkSql pipeline

sparkSql的catalyst优化器是整个sparkSql pipeline的中间核心部分,其执行策略主要两方向,

  1. 基于规则优化/Rule Based Optimizer/RBO
    • 一种经验式、启发式优化思路
    • 对于核心优化算子join有点力不从心,如两张表执行join,到底使用broadcaseHashJoin还是sortMergeJoin,目前sparkSql是通过手工设定参数来确定的,如果一个表的数据量小于某个阈值(默认10M?)就使用broadcastHashJoin
      • nestedLoopsJoin,P,Q双表两个大循环, O(M*N)
      • sortMergeJoin是P,Q双表排序后互相游标
      • broadcastHashJoin,PQ双表中小表放入内存hash表,大表遍历O(1)方式取小表内容
  2. 基于代价优化/Cost Based Optimizer/CBO
    • 针对每个join评估当前两张表使用每种join策略的代价,根据代价估算确定一种代价最小的方案
    • 不同physical plans输入到代价模型(目前是统计),调整join顺序,减少中间shuffle数据集大小,达到最优输出

Catalyst工作流程

  • Parser,利用ANTLR将sparkSql字符串解析为抽象语法树AST,称为unresolved logical plan/ULP
  • Analyzer,借助于数据元数据catalog将ULP解析为logical plan/LP
  • Optimizer,根据各种RBO,CBO优化策略得到optimized logical plan/OLP,主要是对Logical Plan进行剪枝,合并等操作,进而删除掉一些无用计算,或对一些计算的多个步骤进行合并

other

Optimizer是catalyst工作最后阶段了,后面生成physical plan以及执行,主要是由sparkSql来完成。

  • SparkPlanner
    • 优化后的逻辑执行计划OLP依然是逻辑的,并不能被spark系统理解,此时需要将OLP转换成physical plan
    • 从逻辑计划/OLP生成一个或多个物理执行计划,基于成本模型cost model从中选择一个
  • Code generation
    • 生成Java bytecode然后在每一台机器上执行,形成RDD graph/DAG

Parser模块

将sparkSql字符串切分成一个一个token,再根据一定语义规则解析为一个抽象语法树/AST。Parser模块目前基本都使用第三方类库ANTLR来实现,比如Hive,presto,sparkSql等。

parser切词

Spark 1.x版本使用的是Scala原生的Parser Combinator构建词法和语法分析器,而Spark 2.x版本使用的是第三方语法解析器工具ANTLR4。

Spark2.x SQL语句的解析采用的是ANTLR4,ANTLR4根据语法文件SqlBase.g4自动解析生成两个Java类:词法解析器SqlBaseLexer和语法解析器SqlBaseParser。

SqlBaseLexer和SqlBaseParser都是使用ANTLR4自动生成的Java类。使用这两个解析器将SQL字符串语句解析成了ANTLR4的ParseTree语法树结构。然后在parsePlan过程中,使用AstBuilder.scala将ParseTree转换成catalyst表达式逻辑计划LogicalPlan。


Analyzer模块

通过解析后ULP有了基本骨架,但是系统对表的字段信息是不知道的。如sum,select,join,where还有score,people都表示什么含义,此时需要基本的元数据信息schema catalog来表达这些token。最重要的元数据信息就是,

  • 表的schema信息,主要包括表的基本定义(表名、列名、数据类型)、表的数据格式(json、text、parquet、压缩格式等)、表的物理位置
  • 基本函数信息,主要是指类信息

Analyzer会再次遍历整个AST,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变量,sum会被解析为特定的聚合函数,

词义注入
//org.apache.spark.sql.catalyst.analysis.Analyzer.scala
  lazy val batches: Seq[Batch] = Seq( //不同Batch代表不同的解析策略
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::  //通过catalog解析表或列基本数据类型,命名等信息
      ResolveReferences :: //解析从子节点的操作生成的属性,一般是别名引起的,比如people.age
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions :: //解析基本函数,如max,min,agg
      ResolveAliases ::
      ResolveSubquery :: //解析AST中的字查询信息
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates :: //解析全局的聚合函数,比如select sum(score) from table
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

Optimizer模块

Optimizer是catalyst的核心,分为RBO和CBO两种。
RBO的优化策略就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换,即将一棵树等价地转换为另一棵树。SQL中经典的常见优化规则有,

  • 谓词下推(predicate pushdown)
  • 常量累加(constant folding)
  • 列值裁剪(column pruning)
  • Limits合并(combine limits)
由下往上走,从join后再filter优化为filter再join
从`100+80`优化为`180`,避免每一条record都需要执行一次`100+80`的操作
剪裁不需要的字段,特别是嵌套里面的不需要字段。如只需people.age,不需要people.address,那么可以将address字段丢弃
//@see http://blog.csdn.net/oopsoom/article/details/38121259
//org.apache.spark.sql.catalyst.optimizer.Optimizer.scala
  def batches: Seq[Batch] = {
    // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
    // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
    // However, because we also use the analyzer to canonicalized queries (for view definition),
    // we do not eliminate subqueries or compute current time in the analyzer.
    Batch("Finish Analysis", Once,
      EliminateSubqueryAliases,
      ReplaceExpressions,
      ComputeCurrentTime,
      GetCurrentDatabase(sessionCatalog),
      RewriteDistinctAggregates) ::
    //////////////////////////////////////////////////////////////////////////////////////////
    // Optimizer rules start here
    //////////////////////////////////////////////////////////////////////////////////////////
    // - Do the first call of CombineUnions before starting the major Optimizer rules,
    //   since it can reduce the number of iteration and the other rules could add/move
    //   extra operators between two adjacent Union operators.
    // - Call CombineUnions again in Batch("Operator Optimizations"),
    //   since the other rules might make two separate Unions operators adjacent.
    Batch("Union", Once,
      CombineUnions) ::
    Batch("Subquery", Once,
      OptimizeSubqueries) ::
    Batch("Replace Operators", fixedPoint,
      ReplaceIntersectWithSemiJoin,
      ReplaceExceptWithAntiJoin,
      ReplaceDistinctWithAggregate) ::
    Batch("Aggregate", fixedPoint,
      RemoveLiteralFromGroupExpressions,
      RemoveRepetitionFromGroupExpressions) ::
    Batch("Operator Optimizations", fixedPoint,
      // Operator push down
      PushProjectionThroughUnion,
      ReorderJoin,
      EliminateOuterJoin,
      PushPredicateThroughJoin, //谓词下推之一
      PushDownPredicate, //谓词下推之一
      LimitPushDown,
      ColumnPruning, //列值剪裁,常用于聚合操作,join左右孩子操作,合并相邻project列
      InferFiltersFromConstraints,
      // Operator combine
      CollapseRepartition,
      CollapseProject,
      CollapseWindow,
      CombineFilters, //谓词下推之一,合并两个相邻的Filter。合并2个节点,就可以减少树的深度从而减少重复执行过滤的代价
      CombineLimits, //合并Limits
      CombineUnions,
      // Constant folding and strength reduction
      NullPropagation,
      FoldablePropagation,
      OptimizeIn(conf),
      ConstantFolding, //常量累加之一
      ReorderAssociativeOperator,
      LikeSimplification,
      BooleanSimplification, //常量累加之一,布尔表达式的提前短路
      SimplifyConditionals,
      RemoveDispensableExpressions,
      SimplifyBinaryComparison,
      PruneFilters,
      EliminateSorts,
      SimplifyCasts,
      SimplifyCaseConversionExpressions,
      RewriteCorrelatedScalarSubquery,
      EliminateSerialization,
      RemoveRedundantAliases,
      RemoveRedundantProject) ::
    Batch("Check Cartesian Products", Once,
      CheckCartesianProducts(conf)) ::
    Batch("Decimal Optimizations", fixedPoint,
      DecimalAggregates) ::
    Batch("Typed Filter Optimization", fixedPoint,
      CombineTypedFilters) ::
    Batch("LocalRelation", fixedPoint,
      ConvertToLocalRelation,
      PropagateEmptyRelation) ::
    Batch("OptimizeCodegen", Once,
      OptimizeCodegen(conf)) ::
    Batch("RewriteSubquery", Once,
      RewritePredicateSubquery,
      CollapseProject) :: Nil
  }

SparkPlanner模块

至此,OLP已经得到了比较完善的优化,然而此时OLP依然没有办法真正执行,它们只是逻辑上可行,实际上spark并不知道如何去执行这个OLP。

  • 比如join只是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现这个合并,逻辑执行计划并没有说明
optimized logical plan -> physical plan

此时就需要将左边的OLP转换为physical plan物理执行计划,将逻辑上可行的执行计划变为spark可以真正执行的计划。

  • 比如join算子,spark根据不同场景为该算子制定了不同的算法策略,有broadcastHashJoin、shuffleHashJoin以及sortMergeJoin,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,这个过程涉及到cost model/CBO
CBO off
CBO on

CBO中常见的优化是join换位,以便尽量减少中间shuffle数据集大小,达到最优输出。


Job UI

sp.prepare.PrepareController
  • WholeStageCodegen,将多个operators合并成一个java函数,从而提高执行速度
  • Project,投影/只取所需列
  • Exchange,stage间隔,产生了shuffle

Reference

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容