Spark SQL的核心是Catalyst
- Parser模块
利用ANTLR将sparkSql字符串解析为抽象语法树AST - Analyzer模块
借助于数据元数据catalog将ULP解析为logical plan/LP - Optimizer模块
根据各种RBO,CBO优化策略得到optimized logical plan/OLP。
- 谓词下推(predicate pushdown)
- 常量累加(constant folding)
- 列值裁剪(column pruning)
- Limits合并(combine limits)
- SparkPlanner模块 optimized logical plan -> physical plan
优化后的逻辑执行计划OLP依然是逻辑的,并不能被spark系统理解,此时需要将OLP转换成physical plan
从逻辑计划/OLP生成一个或多个物理执行计划,基于成本模型cost model从中选择一个
5.Code generation
生成Java bytecode然后在每一台机器上执行,形成RDD graph/DAG WholeStageCodegen 将多个operators合并成一个java函数,从而提高执行速度
//@see http://blog.csdn.net/oopsoom/article/details/38121259
//org.apache.spark.sql.catalyst.optimizer.Optimizer.scala
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
Batch("Operator Optimizations", fixedPoint,
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
PushPredicateThroughJoin, //谓词下推之一
PushDownPredicate, //谓词下推之一
LimitPushDown,
ColumnPruning, //列值剪裁,常用于聚合操作,join左右孩子操作,合并相邻project列
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters, //谓词下推之一,合并两个相邻的Filter。合并2个节点,就可以减少树的深度从而减少重复执行过滤的代价
CombineLimits, //合并Limits
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
FoldablePropagation,
OptimizeIn(conf),
ConstantFolding, //常量累加之一
ReorderAssociativeOperator,
LikeSimplification,
BooleanSimplification, //常量累加之一,布尔表达式的提前短路
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters,
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
Batch("Typed Filter Optimization", fixedPoint,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) ::
Batch("OptimizeCodegen", Once,
OptimizeCodegen(conf)) ::
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
CollapseProject) :: Nil
}