-
spark sql 为何要进行 analyzer ?
- 通过 antlr 解析出来的抽象语法树 UnResolved LogicalPlan 仅仅是一种数据结构,不包含任何数据信息
- Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform,绑定数据源以及数据类型
-
spark sql 的 Analyzer 主要干了什么?
- Analyzer模块将Unresolved LogicalPlan结合元数据catalog进行绑定,最终转化为Resolved LogicalPlan
spark sql 实现 analyzer 的具体细节
-
Spark sql 中的 Rule
Spark sql 解析出来的抽象语法树,每个节点都是由 TreeNode 构成的,而 Rule 作为一条规则作用在 TreeNode 节点上进行 transform 操作
-
Rule 的定义如下,apple 方法由子类实现定义了每个 Rule 该如何 transform
``` abstract class Rule[TreeType <: TreeNode[_]] extends Logging { /** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className } def apply(plan: TreeType): TreeType } ```
-
spark sql 定义的 Rule
- spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定义了很多 Batch rule
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveHigherOrderFunctions(catalog) :: ResolveLambdaVariables(conf) :: ResolveTimeZone(conf) :: ResolveRandomSeed :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
- spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定义了很多 Batch rule
-
Analyzer 调用父类的 execute 方法,遍历上面定义的 batches,将 Rule 作用在 Logical plan 上实现 transform
def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) planChangeLogger.log(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) } result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } } curPlan }
- batch 和里面的 rules 都是连续执行的,每执行完一个 batch 都判断此 batch 执行的次数是否达到 maxIterations 和执行此 batch 前后是否有变化,达到 maxIterations 或者执行 batch 前后无变化都不再执行此batch。
-
解析 spark sql 如何通过 Catolog 将数据信息绑定到 logical plan 上的
- ResolveRelations 实现替换 UnresolvedRelation 为 resolveRelation
- ResolveRelations 继承自 Rule,定义: object ResolveRelations extends Rule[LogicalPlan]
- ResolveRelations 的 apply() 方法定义了 UnsolveRelation 到 ResolveRelation 的转换
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) }
-
resolveOperatorsUp 是一个柯理化调用
{ case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) }
resolveOperatorsUp 后面的整个代码块作为参数传入 resolveOperatorsUp 方法中
-
resolveOperatorsUp 的具体实现
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { if (!analyzed) { AnalysisHelper.allowInvokingTransformsInAnalyzer { /* apply方法中调用LogicalPlan 的resolveOperatorsUp方法并将后面的偏函数rule作为参数。自下而上的应用于逻辑算子树上的每一个LogicalPlan节点 */ val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule)) if (self fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { /* rule.applyOrElse(self, identity[LogicalPlan]) 将偏函数应用到LogicalPlan。偏函数会对传入的参数进行模式匹配,只有匹配成功的参数才会进行处理。在rule偏函数中可以看出,如果LogicalPlan是UnresolvedRelation类型,则调用resolveRelation(u)方法。 */ rule.applyOrElse(self, identity[LogicalPlan]) } } else { CurrentOrigin.withOrigin(origin) { rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) } } } } else { self } }
首先判断此 plan 是否已经被 analyzed 过,接着调用 mapChildren,并且传入的是resolveOperators 方法,其实就是一个递归调用,它会优先处理它的子节点,然后再处理自己,如果处理后的LogicalPlan 和当前的相等就说明他没有子节点了,则处理它自己,反之处理返回的 plan。
如果匹配的是 UnresolvedRelation 结点,则调用 resolveRelation 方法进行解析,
经过resolveRelation方法之后,返回的 logical plan 是已经和实际元数据绑定好的plan,可能是从globalTempViewManager直接获取的,可能是从tempTables直接获取,也可能是从externalCatalog 获取的元数据。
-