Spark sql analyzer 过程解析

  1. spark sql 为何要进行 analyzer ?

    • 通过 antlr 解析出来的抽象语法树 UnResolved LogicalPlan 仅仅是一种数据结构,不包含任何数据信息
    • Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform,绑定数据源以及数据类型
  2. spark sql 的 Analyzer 主要干了什么?

    • Analyzer模块将Unresolved LogicalPlan结合元数据catalog进行绑定,最终转化为Resolved LogicalPlan
spark sql 实现 analyzer 的具体细节
  1. Spark sql 中的 Rule

    • Spark sql 解析出来的抽象语法树,每个节点都是由 TreeNode 构成的,而 Rule 作为一条规则作用在 TreeNode 节点上进行 transform 操作

    • Rule 的定义如下,apple 方法由子类实现定义了每个 Rule 该如何 transform

        ```
        abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
      
          /** Name for this rule, automatically inferred based on class name. */
          val ruleName: String = {
            val className = getClass.getName
            if (className endsWith "$") className.dropRight(1) else className
          }
      
          def apply(plan: TreeType): TreeType
        }
        ```
      
  2. spark sql 定义的 Rule

    • spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定义了很多 Batch rule
       lazy val batches: Seq[Batch] = Seq(
          Batch("Hints", fixedPoint,
            new ResolveHints.ResolveBroadcastHints(conf),
            ResolveHints.ResolveCoalesceHints,
            ResolveHints.RemoveAllHints),
          Batch("Simple Sanity Check", Once,
            LookupFunctions),
          Batch("Substitution", fixedPoint,
            CTESubstitution,
            WindowsSubstitution,
            EliminateUnions,
            new SubstituteUnresolvedOrdinals(conf)),
          Batch("Resolution", fixedPoint,
            ResolveTableValuedFunctions ::
            ResolveRelations ::
            ResolveReferences ::
            ResolveCreateNamedStruct ::
            ResolveDeserializer ::
            ResolveNewInstance ::
            ResolveUpCast ::
            ResolveGroupingAnalytics ::
            ResolvePivot ::
            ResolveOrdinalInOrderByAndGroupBy ::
            ResolveAggAliasInGroupBy ::
            ResolveMissingReferences ::
            ExtractGenerator ::
            ResolveGenerate ::
            ResolveFunctions ::
            ResolveAliases ::
            ResolveSubquery ::
            ResolveSubqueryColumnAliases ::
            ResolveWindowOrder ::
            ResolveWindowFrame ::
            ResolveNaturalAndUsingJoin ::
            ResolveOutputRelation ::
            ExtractWindowExpressions ::
            GlobalAggregates ::
            ResolveAggregateFunctions ::
            TimeWindowing ::
            ResolveInlineTables(conf) ::
            ResolveHigherOrderFunctions(catalog) ::
            ResolveLambdaVariables(conf) ::
            ResolveTimeZone(conf) ::
            ResolveRandomSeed ::
            TypeCoercion.typeCoercionRules(conf) ++
            extendedResolutionRules : _*),
          Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
          Batch("View", Once,
            AliasViewChild(conf)),
          Batch("Nondeterministic", Once,
            PullOutNondeterministic),
          Batch("UDF", Once,
            HandleNullInputsForUDF),
          Batch("FixNullability", Once,
            FixNullability),
          Batch("Subquery", Once,
            UpdateOuterReferences),
          Batch("Cleanup", fixedPoint,
            CleanupAliases)
        )
      
  3. Analyzer 调用父类的 execute 方法,遍历上面定义的 batches,将 Rule 作用在 Logical plan 上实现 transform

     def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
        val planChangeLogger = new PlanChangeLogger()
    
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true
    
          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
            curPlan = batch.rules.foldLeft(curPlan) {
              case (plan, rule) =>
                val startTime = System.nanoTime()
                val result = rule(plan)
                val runTime = System.nanoTime() - startTime
    
                if (!result.fastEquals(plan)) {
                  queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                  queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
                  planChangeLogger.log(rule.ruleName, plan, result)
                }
                queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
                queryExecutionMetrics.incNumExecution(rule.ruleName)
    
                // Run the structural integrity checker against the plan after each rule.
                if (!isPlanIntegral(result)) {
                  val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                    "the structural integrity of the plan is broken."
                  throw new TreeNodeException(result, message, null)
                }
    
                result
            }
            iteration += 1
            if (iteration > batch.strategy.maxIterations) {
              // Only log if this is a rule that is supposed to run more than once.
              if (iteration != 2) {
                val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
                if (Utils.isTesting) {
                  throw new TreeNodeException(curPlan, message, null)
                } else {
                  logWarning(message)
                }
              }
              continue = false
            }
    
            if (curPlan.fastEquals(lastPlan)) {
              logTrace(
                s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
              continue = false
            }
            lastPlan = curPlan
          }
    
          if (!batchStartPlan.fastEquals(curPlan)) {
            logDebug(
              s"""
                |=== Result of Batch ${batch.name} ===
                |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
              """.stripMargin)
          } else {
            logTrace(s"Batch ${batch.name} has no effect.")
          }
        }
    
        curPlan
      }
    
    • batch 和里面的 rules 都是连续执行的,每执行完一个 batch 都判断此 batch 执行的次数是否达到 maxIterations 和执行此 batch 前后是否有变化,达到 maxIterations 或者执行 batch 前后无变化都不再执行此batch。
  4. 解析 spark sql 如何通过 Catolog 将数据信息绑定到 logical plan 上的

    • ResolveRelations 实现替换 UnresolvedRelation 为 resolveRelation
    • ResolveRelations 继承自 Rule,定义: object ResolveRelations extends Rule[LogicalPlan]
    • ResolveRelations 的 apply() 方法定义了 UnsolveRelation 到 ResolveRelation 的转换
      def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
            case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
              EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
                case v: View =>
                  u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
                case other => i.copy(table = other)
              }
            case u: UnresolvedRelation => resolveRelation(u)
          }
      
      • resolveOperatorsUp 是一个柯理化调用

        {
        case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
                        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
                          case v: View =>
                            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
                          case other => i.copy(table = other)
                        }
                      case u: UnresolvedRelation => resolveRelation(u)
                    }
        
        • resolveOperatorsUp 后面的整个代码块作为参数传入 resolveOperatorsUp 方法中

        • resolveOperatorsUp 的具体实现

          def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
              if (!analyzed) {
                AnalysisHelper.allowInvokingTransformsInAnalyzer {
                  /* apply方法中调用LogicalPlan 的resolveOperatorsUp方法并将后面的偏函数rule作为参数。自下而上的应用于逻辑算子树上的每一个LogicalPlan节点 */
                  val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
                  if (self fastEquals afterRuleOnChildren) {
                    CurrentOrigin.withOrigin(origin) {
                      /* rule.applyOrElse(self, identity[LogicalPlan]) 将偏函数应用到LogicalPlan。偏函数会对传入的参数进行模式匹配,只有匹配成功的参数才会进行处理。在rule偏函数中可以看出,如果LogicalPlan是UnresolvedRelation类型,则调用resolveRelation(u)方法。 */
                      rule.applyOrElse(self, identity[LogicalPlan])
                    }
                  } else {
                    CurrentOrigin.withOrigin(origin) {
                      rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
                    }
                  }
                }
              } else {
                self
              }
            }
          
        • 首先判断此 plan 是否已经被 analyzed 过,接着调用 mapChildren,并且传入的是resolveOperators 方法,其实就是一个递归调用,它会优先处理它的子节点,然后再处理自己,如果处理后的LogicalPlan 和当前的相等就说明他没有子节点了,则处理它自己,反之处理返回的 plan。

        • 如果匹配的是 UnresolvedRelation 结点,则调用 resolveRelation 方法进行解析,
          经过resolveRelation方法之后,返回的 logical plan 是已经和实际元数据绑定好的plan,可能是从globalTempViewManager直接获取的,可能是从tempTables直接获取,也可能是从externalCatalog 获取的元数据。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容