Spark SQL内核剖析之Parser

Spark SQL 架构

Spark SQL 的整体架构如下图所示
Spark SQL Catalyst

从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作

  • Parser 解析 SQL,生成 Unresolved Logical Plan
  • 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
  • Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
  • Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan
  • CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
  • Spark 以 DAG 的方法执行上述 Physical Plan
  • 在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率

Parser

Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan。
当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法。该方法分两步

  • 使用 Antlr 生成的 SqlBaseLexer 对 SQL 进行词法分析,生成 CommonTokenStream
  • 使用 Antlr 生成的 SqlBaseParser 进行语法分析,得到 LogicalPlan
    现在两张表,对其进行关联查询如下:
CREATE TABLE score (
  id INT,
  math_score INT,
  english_score INT
)
CREATE TABLE people (
  id INT,
  age INT,
  name INT
)
//关联查询
SELECT sum(v)
FROM (
  SELECT score.id,
    100 + 80 + score.math_score + score.english_score AS v
  FROM people
  JOIN score
  ON people.id = score.id
  AND people.age > 10
) tmp

在sql方法里运行,执行流程如下:

 def sql(sqlText: String): DataFrame = {
    val tracker = new QueryPlanningTracker
    val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
      sessionState.sqlParser.parsePlan(sqlText)
    }
    Dataset.ofRows(self, plan, tracker)
  }

SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法,具体流程如下:

// 先看SessionState,是通过instantiateSessionState方法初始化
lazy val sessionState: SessionState = {
    parentSessionState
      .map(_.clone(this))
      .getOrElse {
        val state = SparkSession.instantiateSessionState(
          SparkSession.sessionStateClassName(sparkContext.conf),
          self)
        initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v)}
        state
      }
}
// 看下sessionStateClassName()的实现
private def sessionStateClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
      case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
      case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
    }
  }
// 这里我们考虑CATALOG_IMPLEMENTATION为hive这种情况。
// 看下instantiateSessionState()的实现
private def instantiateSessionState(
      className: String,
      sparkSession: SparkSession): SessionState = {
    try {
      // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
      val clazz = Utils.classForName(className)
      val ctor = clazz.getConstructors.head
      ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
    } catch {
      case NonFatal(e) =>
        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
    }
  }
// className是org.apache.spark.sql.hive.HiveSessionStateBuilder,通过反射调用它的build()方法
// 如下为build()的实现
 def build(): SessionState = {
    new SessionState(
      session.sharedState,
      conf,
      experimentalMethods,
      functionRegistry,
      udfRegistration,
      () => catalog,
      sqlParser,
      () => analyzer,
      () => optimizer,
      planner,
      streamingQueryManager,
      listenerManager,
      () => resourceLoader,
      createQueryExecution,
      createClone)
  }
}
// 看下sqlParser,它的值为
 protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }

接着我们来看parsePlan()方法

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }
// 在parse()方法里,进行了语法和词法分析
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
    // SqlBaseLexer 对 SQL 进行词法分析
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)
    lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
    lexer.ansi = SQLConf.get.ansiParserEnabled
    val tokenStream = new CommonTokenStream(lexer)
    //SqlBaseParser 进行语法分析
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
    parser.ansi = SQLConf.get.ansiParserEnabled
    try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
    }
      catch {
        case e: ParseCancellationException =>
          // if we fail, parse with LL mode
          tokenStream.seek(0) // rewind input stream
          parser.reset()

          // Try Again.
          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
          toResult(parser)
    }
    ···
  }

生成的 UnresolvedPlan 如下图所示。
Spark SQL Parser

从上图可见

  • 查询涉及的两张表,被解析成了两个 UnresolvedRelation,也即只知道这是两张表,却并不知道它们是 EXTERNAL TABLE 还是 MANAGED TABLE,也不知道它们的数据存在哪儿,更不知道它们的表结构如何
  • sum(v) 的结果未命名
  • Project 部分只知道是选择出了属性,却并不知道这些属性属于哪张表,更不知道其数据类型
  • Filter 部分也不知道数据类型
    Spark SQL 解析出的 UnresolvedPlan 如下所示
== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias tmp
   +- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#493]
      +- 'Filter (('people.id = 'score.id) && ('people.age > 10))
         +- 'Join Inner
            :- 'UnresolvedRelation `people`
            +- 'UnresolvedRelation `score`
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容