1、Catalyst源码解读之SqlParser

本篇文章基于Spark1.6.1源码解读Catalyst下的SqlParser

  1. spark sql中可以分三种sql语句
    第一种DDL语句,DDL语句包含以下三种操作,代码见DDLParser
createTable | describeTable | refreshTable

第二种是spark自身的sql语句,spark自身的sql语句包含以下六种操作,代码见SparkSQLParser

cache | uncache | set | show | desc | others

第三种是真正的SQL语句,如select语句,SQL语句包含以下三种操作,代码见SqlParser

start1 | insert | cte

以上这些用"|"分隔的操作会生成一个Parser[LogicalPlan],最终变成LogicalPlan

  1. 从熟悉的sqlContext.sql("....")方法开始,一步一步分析sql语句是怎样被解析生成LogicalPlan。
    第一步从SqlContext的sql方法开始,代码如下
def sql(sqlText: String): DataFrame = {  DataFrame(this, parseSql(sqlText))}

这里调用了parseSql(sqlText)方法,代码如下

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

第二步调用DDLParser的parse方法,代码如下

  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
    try {
      parse(input)
    } catch {
      case ddlException: DDLException => throw ddlException
      case _ if !exceptionOnError => parseQuery(input)
      case x: Throwable => throw x
    }
  }

这里有两步操作,第一步是try语句中的parse(input)语句,他的作用是解析DDL语句,如果成功直接返回。否则看异常,异常中的语句不要忽略了,如果ddl语句解析失败调用parseQuery(input),那么parseQuery(input)是从哪里来的呢?他是在DDLParser实例化的时候传递进来的

class DDLParser(parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging {
  1. 这里插入一个知识点
    parse()方法并不是DDLParser的方法,他是父类AbstractSparkSQLParser中的方法,接下来介绍的SparkSQLParser和SqlParser都继承自AbstractSparkSQLParser,看一下parse方法的代码
  def parse(input: String): LogicalPlan = synchronized {
    //初始化并加载关键词,关键词是在子类中定义的,比如DDLParser、SparkSQLParser、SqlParser这三个类中分别定义了自己的关键词
    //initLexical方法本身很简单,这里就不说了
    initLexical
    //phrase就是根据输入的语句(input)按照规则(start)来解析
    //start就是第1段中介绍的三种操作,start方法被子类重写
    //所以DDLParser中调用了父类的parse方法后会回调子类DDLParser中的start方法(或是变量,因为方法或函数也可以赋值给变量)
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
  }
  1. 回到SqlContext中看DDLParser实例化的代码
  @transient
  protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
  @transient
  protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

在DDLParser实例化的时候传入了SparkSQLParser中的parse方法,parse方法就是SparkSQLParser父类AbstractSparkSQLParser中的方法,在第3段中介绍过。看到这里终于明白了第2段中的parseQuery(input)就是这里的parse方法。

  1. 通过parse方法的调用回调SparkSQLParser中的start变量 ,start变量代码如下
override protected lazy val start: Parser[LogicalPlan] =  cache | uncache | set | show | desc | others

这里有五种操作,前面四种对应了spark自身操作的sql语句,这里就不展开说明了,以后有时间再具体分析。如果Spark自身操作的sql没有匹配成功会调用others规则,others操作其实是调用了SqlParser中的parse方法,他是怎么被调用的呢?接着住下看,这个调用有点绕。首先看others变量,代码如下

  private lazy val others: Parser[LogicalPlan] =
    wholeInput ^^ {
      case input => fallback(input)
    }

这里回调 fallback(input)方法,fallback方法就在SparkSQLParser实例的时候传递进来的,我们看一下fallback方法是怎样产生的

  1. 回到SQLContext中看SparkSQLParser的实例化
  @transient
  protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

这里调用getSQLDialect()方法生成一个ParserDialect(方言),为什么需要方言呢,是为了区分spark sql 和hive sql。这里看DefaultParserDialect(ParserDialect的子类,默认是spark sql的方言)的parse方法,代码如下

 override def parse(sqlText: String): LogicalPlan = {
    sqlParser.parse(sqlText)
  }

看到这里sqlParser终于出现了,调用了SqlParser的parse方法(SqlParser父类AbstractSparkSQLParser中的parse方法)。这个parse方法就是传递给SparkSQLParser中的fallback(input)函数

  1. 这里重点看SqlParser是怎样解析sql语句的,根据前面介绍的内容知道调用SqlParser中的parse方法后,会回调SqlParser中的start变量,start变量代码如下
protected lazy val start: Parser[LogicalPlan] =  start1 | insert | cte

这里有三种操作,看一下start1 的代码

 protected lazy val start1: Parser[LogicalPlan] =
    (select | ("(" ~> select <~ ")")) *
    ( UNION ~ ALL        ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
    | INTERSECT          ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
    | EXCEPT             ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
    | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
    )

这一堆是什么玩意儿,慢慢的来分析一下
select 会生成一个Parser,其他代码如下

 protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> DISTINCT.? ~
      repsep(projection, ",") ~
      (FROM   ~> relations).? ~
      (WHERE  ~> expression).? ~
      (GROUP  ~  BY ~> rep1sep(expression, ",")).? ~
      (HAVING ~> expression).? ~
      sortType.? ~
      (LIMIT  ~> expression).? ^^ {
        case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>
          val base = r.getOrElse(OneRowRelation)
          val withFilter = f.map(Filter(_, base)).getOrElse(base)
          val withProjection = g
            .map(Aggregate(_, p.map(UnresolvedAlias(_)), withFilter))
            .getOrElse(Project(p.map(UnresolvedAlias(_)), withFilter))
          val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
          val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
          val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
          val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
          withLimit
      }

通过上面简单的一二十行代码就完成了sql语句的解析,太有魔力了。感慨一下scala语句强大的表达能力。
想看懂上面的代码,我们先来看一下那些符号>、、^^等是什么意思

|  左边算子和右边的算子只要有一个成功了,就返回succeed,类似or
~  左边的算子成功后,右边的算子对后续的输入也计算成功,就返回succeed
.?  如果p算子成功则返回则返回Some(x) 如果p算子失败,返回fails
^^^  如果左边的算子成功,取消左边算子的结果,返回右边算子。
~> 如果左边的算子和右边的算子都成功了,返回的结果中不包含左边的返回值。  
<~ 这个和~>操作符的意思相反,如果左边的算子和右边的算子都成功了,返回的结果中不包含右边的
^^{} 或者 ^^=> 变形连接符,意思是如果左边的算子成功了,用^^右边的算子函数作用于返回的结果

这些符号究竟是什么东西,又代表的是什么语法,其实就是Parser的一个个方法而已,原来还是scala的语法,差点被迷惑了。
这个语句就是根据关键字、操作符号、函数生成一个parser[LogicalPlan]类型的withLimit。
关键字代码如下

protected val ALL = Keyword("ALL")
  protected val AND = Keyword("AND")
  protected val APPROXIMATE = Keyword("APPROXIMATE")
  protected val AS = Keyword("AS")
  protected val ASC = Keyword("ASC")
  protected val BETWEEN = Keyword("BETWEEN")
  protected val BY = Keyword("BY")
  protected val CASE = Keyword("CASE")
  protected val CAST = Keyword("CAST")
  protected val DESC = Keyword("DESC")
  protected val DISTINCT = Keyword("DISTINCT")
  protected val ELSE = Keyword("ELSE")
  protected val END = Keyword("END")
  protected val EXCEPT = Keyword("EXCEPT")
  protected val FALSE = Keyword("FALSE")
  protected val FROM = Keyword("FROM")
  protected val FULL = Keyword("FULL")
  protected val GROUP = Keyword("GROUP")
  protected val HAVING = Keyword("HAVING")
  protected val IN = Keyword("IN")
  protected val INNER = Keyword("INNER")
  protected val INSERT = Keyword("INSERT")
  protected val INTERSECT = Keyword("INTERSECT")
  protected val INTERVAL = Keyword("INTERVAL")
  protected val INTO = Keyword("INTO")
  protected val IS = Keyword("IS")
  protected val JOIN = Keyword("JOIN")
  protected val LEFT = Keyword("LEFT")
  protected val LIKE = Keyword("LIKE")
  protected val LIMIT = Keyword("LIMIT")
  protected val NOT = Keyword("NOT")
  protected val NULL = Keyword("NULL")
  protected val ON = Keyword("ON")
  protected val OR = Keyword("OR")
  protected val ORDER = Keyword("ORDER")
  protected val SORT = Keyword("SORT")
  protected val OUTER = Keyword("OUTER")
  protected val OVERWRITE = Keyword("OVERWRITE")
  protected val REGEXP = Keyword("REGEXP")
  protected val RIGHT = Keyword("RIGHT")
  protected val RLIKE = Keyword("RLIKE")
  protected val SELECT = Keyword("SELECT")
  protected val SEMI = Keyword("SEMI")
  protected val TABLE = Keyword("TABLE")
  protected val THEN = Keyword("THEN")
  protected val TRUE = Keyword("TRUE")
  protected val UNION = Keyword("UNION")
  protected val WHEN = Keyword("WHEN")
  protected val WHERE = Keyword("WHERE")
  protected val WITH = Keyword("WITH")

根据关键词我们可以知道在写sql语句的时候哪些操作可以使用,哪些操作是不支持的

  1. withLimit:Parser[LogicalPlan]是怎么变成LogicalPlan的呢?



    Parser[LogicalPlan]继承自一个函数,最终返回ParseResult[T]类型,ParseResult[T]有两个子类,分别是Success和NoSuccess,代码如下

case class Success[+T](result: T, override val next: Input) extends ParseResult[T]
sealed abstract class NoSuccess(val msg: String, override val next: Input) extends ParseResult[Nothing] 

当sql解析成功后会返回Success。
再次看一下调用关系,最后调用的是start



接着看start的调用

def parse(input: String): LogicalPlan = synchronized {
    // Initialize the Keywords.
    // 初始化分词器的关键字
    initLexical
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
}

用模式匹配去匹配结果是ParseResult[LogicalPlan]的哪一个子类,如果是Success,看一下Success的代码

case class Success[+T](result: T, override val next: Input) extends ParseResult[T]

这里的T就是LogicalPlan

  1. 虽然还有很多其他操作,但解析的步骤都是一样的
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容