[Spark SQL] 源码解析之Optimizer

前言

由前面博客我们知道了SparkSql整个解析流程如下:

  • sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan;
  • analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan;
  • optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan;
  • SparkPlan 将 LogicalPlan 转换成PhysicalPlan;
  • prepareForExecution()将 PhysicalPlan 转换成可执行物理计划;
  • 使用 execute()执行可执行物理计划;

详解optimizer 模块

optimizer 以及之后的模块都只会在触发了action操作后才会执行。优化器是用来将Resolved LogicalPlan转化为optimized LogicalPlan的。

optimizer 就是根据大佬们多年的SQL优化经验来对语法树进行优化,比如谓词下推、列值裁剪、常量累加等。优化的模式和Analyzer非常相近,Optimizer 同样继承了RuleExecutor,并定义了很多优化的Rule:

def batches: Seq[Batch] = {
    // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
    // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
    // However, because we also use the analyzer to canonicalized queries (for view definition),
    // we do not eliminate subqueries or compute current time in the analyzer.
    Batch("Finish Analysis", Once,
      EliminateSubqueryAliases,
      EliminateView,
      ReplaceExpressions,
      ComputeCurrentTime,
      GetCurrentDatabase(sessionCatalog),
      RewriteDistinctAggregates,
      ReplaceDeduplicateWithAggregate) ::
    //////////////////////////////////////////////////////////////////////////////////////////
    // Optimizer rules start here
    //////////////////////////////////////////////////////////////////////////////////////////
    // - Do the first call of CombineUnions before starting the major Optimizer rules,
    //   since it can reduce the number of iteration and the other rules could add/move
    //   extra operators between two adjacent Union operators.
    // - Call CombineUnions again in Batch("Operator Optimizations"),
    //   since the other rules might make two separate Unions operators adjacent.
    Batch("Union", Once,
      CombineUnions) ::
    Batch("Pullup Correlated Expressions", Once,
      PullupCorrelatedPredicates) ::
    Batch("Subquery", Once,
      OptimizeSubqueries) ::
    Batch("Replace Operators", fixedPoint,
      ReplaceIntersectWithSemiJoin,
      ReplaceExceptWithAntiJoin,
      ReplaceDistinctWithAggregate) :: // aggregate替换distinct
    Batch("Aggregate", fixedPoint,
      RemoveLiteralFromGroupExpressions,
      RemoveRepetitionFromGroupExpressions) ::
    Batch("Operator Optimizations", fixedPoint, Seq(
      // Operator push down
      PushProjectionThroughUnion, //谓词下推
      ReorderJoin(conf),
      EliminateOuterJoin(conf),
      PushPredicateThroughJoin,
      PushDownPredicate,
      LimitPushDown(conf),
      ColumnPruning, //列剪裁
      InferFiltersFromConstraints(conf),
      // Operator combine
      CollapseRepartition,
      CollapseProject,
      CollapseWindow,
      CombineFilters, //合并filter
      CombineLimits, //合并limit
      CombineUnions,
      // Constant folding and strength reduction
      NullPropagation(conf), //null处理
      FoldablePropagation,
      OptimizeIn(conf), // 关键字in的优化,替代为InSet
      ConstantFolding, //针对常量的优化,在这里会直接计算可以获得的常量
      ReorderAssociativeOperator,
      LikeSimplification, //表达式简化
      BooleanSimplification,
      SimplifyConditionals,
      RemoveDispensableExpressions,
      SimplifyBinaryComparison,
      PruneFilters(conf),
      EliminateSorts,
      SimplifyCasts,
      SimplifyCaseConversionExpressions,
      RewriteCorrelatedScalarSubquery,
      EliminateSerialization,
      RemoveRedundantAliases,
      RemoveRedundantProject,
      SimplifyCreateStructOps,
      SimplifyCreateArrayOps,
      SimplifyCreateMapOps) ++
      extendedOperatorOptimizationRules: _*) ::
    Batch("Check Cartesian Products", Once,
      CheckCartesianProducts(conf)) ::
    Batch("Join Reorder", Once,
      CostBasedJoinReorder(conf)) ::
    Batch("Decimal Optimizations", fixedPoint, //精度优化
      DecimalAggregates(conf)) ::
    Batch("Object Expressions Optimization", fixedPoint,
      EliminateMapObjects,
      CombineTypedFilters) ::
    Batch("LocalRelation", fixedPoint,
      ConvertToLocalRelation,
      PropagateEmptyRelation) ::
    Batch("OptimizeCodegen", Once,
      OptimizeCodegen(conf)) ::
    Batch("RewriteSubquery", Once,
      RewritePredicateSubquery,
      CollapseProject) :: Nil
  }

batch的执行和analyzer一样是通过RuleExecutor的execute方法依次遍历,这里不再解析。这里有部分优化的例子

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 由前面博客我们知道了SparkSql整个解析流程如下: sqlText 经过 SqlParser 解析成 U...
    BIGUFO阅读 6,268评论 0 11
  • 在前面的文章《spark基础(上篇)》和《spark基础(下篇)》里面已经介绍了spark的一些基础知识,知道了s...
    ZPPenny阅读 22,035评论 2 36
  • 预备知识 先介绍在Spark SQL中两个非常重要的数据结构:Tree和Rule。 SparkSql的第一件事就是...
    BIGUFO阅读 8,677评论 0 8
  • 1.感恩今天自己炒了青菜,第一次,味道很好,我很满意,原来做菜并不难,自己真是棒棒哒! 2.感恩小缇的希希老大和我...
    小狐狸娃娃阅读 2,593评论 0 0
  • 很多技术人员,想要加入一家有前途的创业公司,成为联合创始人CTO,或者成为技术骨干,遇到的第一个头疼的问题就是:到...
    范凯阅读 7,332评论 5 35