77、Spark SQL之核心源码深度剖析(DataFrame lazy特性、Optimizer优化策略等)

入口,org.apache.spark.sql包下的SQLContext类
首先看sql()方法
看sql()方法之前,先看下SQLConf

protected[sql] lazy val conf: SQLConf = new SQLConf
  /**
    * 如果要给Spark SQL设置一些参数,那么要使用SQLContext.setConf()方法,底层是会将配置信息放入SQLConf对象中的
    */
  def setConf(props: Properties): Unit = conf.setConf(props)

然后来看sql()方法

/**
    * 使用Spark执行一条SQL查询语句,将结果作为DataFrame返回,SQL解析使用的方言,可以
    * 通过spark.sql.dialect参数,来进行设置
    */
  def sql(sqlText: String): DataFrame = {
    // 首先,查看我们通过SQLContext.setConf()方法设置的参数,Spark.sql.dialect,
    // 如果是sql方言,就进入接下来的执行,如果不是sql,就直接报错
    // 这里的conf就是SQLConf
    if (conf.dialect == "sql") {
      // SQLContext的sql()方法正式进入执行阶段,Spark SQL也是有lazy特性的,其实,调用sql()去执行一条SQL语句的时候
      // 默认只会调用SqlParser组件针对SQL语句生成一个Unresolved LogicPlan,然后,将Unresolved LogicPlan和SQLXontext
      // 自身的实例(this),封装为一个DataFrame,返回DataFrame给用户,其中仅仅封装了SQL语句的Unresolved LogicPlan


      // 在用户拿到了封装了Unresolved LogicPlan的DataFrame之后,可以执行一些show()、select().show()、groupBy().show()
      // 或者拿到DataFrame对应的RDD,执行一系列transformation操作,最后执行一个Action后
      // 才会去触发Spark SQL后续的SQL执行流程,包括Analyzer、Optimizer、SparkPlan、execute PysicalPlan

      // 首先看parseSql()方法,传入SQL语句,调用SqlParser解析SQL,获取Unresolved LogicPlan

      // parseSql(),总结一下,就是调用了SqlParser的apply()方法,即由SqlParser将SQL语句通过内部的各种select、insert这种词法、语法解析器
      // 来进行解析,然后将SQL语句的各个部分,组装成一个LogicalPlan,但是这里的LogicalPlan,只是一颗语法树,还不知道自己具体执行计划的时候,
      // 数据从哪里来,所以,叫做UnResolved LogicalPlan,解析了SQL,拿到了UnResolved LogicalPlan,会封装成一个DataFrame,返回给用户,
      // 用户此时就开始用DataFrame执行各种操作了
      DataFrame(this, parseSql(sqlText))
    } else {
      sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
    }
  }

接下来看看parseSql()方法

  // sqlParser 实际上是SparkSQLParser的实例,SparkSQLParser里面,又封装了catalyst的SqlParser
  @transient
  protected[sql] val sqlParser = {
    val fallback = new catalyst.SqlParser
    new SparkSQLParser(fallback(_))
  }

  protected[sql] def parseSql(sql: String): LogicalPlan = {
    // parseSql()方法,是SqlParser执行的入口,实际上,会调用SqlParser的apply()方法,来获取一个对SQL语句解析后的LogicalPlan
    ddlParser(sql, false).getOrElse(sqlParser(sql))
  }

parseSql()会调SqlParser的apply()方法
SqlParser这个类在org.apache.spark.sql.catalyst包下,其继承了AbstractSparkSQLParser 类

class SqlParser extends AbstractSparkSQLParser {

parseSql()会调SqlParser的apply()方法,会调AbstractSparkSQLParser的apply()方法

private[sql] abstract class AbstractSparkSQLParser
  extends StandardTokenParsers with PackratParsers {

  /**
    * 实际上,调用SqlParser的apply()方法,将SQL解析成LogicalPlan时,会调用到SqlParser的父类,AbstractSparkSQLParser
    * 的apply()方法
    */
  def apply(input: String): LogicalPlan = {
    // Initialize the Keywords.
    lexical.initialize(reservedWords)
    // 这个代码的意思,就是用lexical.Scanner,针对SQL语句,来进行语法检查、分析,满足语法检查结果的话,就使用SQL解析器
    // 针对SQL进行解析,包括词法解析(将SQL语句解析成一个个短语,token)、语法解析,最后生成一个Unresolved LogicalPlan
    // 该LogicalPlan仅仅针对SQL语句本身生成,纯语法,不设计任何关联的数据源等等信息
    phrase(start)(new lexical.Scanner(input)) match {
      case Success(plan, _) => plan
      case failureOrError => sys.error(failureOrError.toString)
    }
  }
}

看看lexical

/**
  * 用SqlLexical,对SQL语句,执行一个检查,如果满足检查的话,那么才去分析,
  * 否则,说明SQL语句本身的语法,是有问题的
  */

class SqlLexical extends StdLexical {
  case class FloatLit(chars: String) extends Token {
    override def toString = chars
  }
}

接着看看SqlParser的相关代码

/**
    * 从这里可以看出来,Spark SQL是支持两种主要的SQL语法的,包括select语句和insert语句
    */
  protected lazy val start: Parser[LogicalPlan] =
    ( (select | ("(" ~> select <~ ")")) *
      ( UNION ~ ALL        ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) }
      | INTERSECT          ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) }
      | EXCEPT             ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)}
      | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
      )
    | insert
    )

  /**
    * 这里,就是对select语句执行解析,语句里面可以包括FROM,WHERE,GROUP,HAVING,LIMIT
    */
  protected lazy val select: Parser[LogicalPlan] =
    SELECT ~> DISTINCT.? ~
      repsep(projection, ",") ~
      (FROM   ~> relations).? ~
      (WHERE  ~> expression).? ~
      (GROUP  ~  BY ~> rep1sep(expression, ",")).? ~
      (HAVING ~> expression).? ~
      sortType.? ~
      (LIMIT  ~> expression).? ^^ {
        case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l  =>
          val base = r.getOrElse(NoRelation)
          val withFilter = f.map(Filter(_, base)).getOrElse(base)
          val withProjection = g
            .map(Aggregate(_, assignAliases(p), withFilter))
            .getOrElse(Project(assignAliases(p), withFilter))
          val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
          val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
          val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
          val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
          withLimit
      }

  /**
    * 解析INSERT、OVERWRITE这种语法
    */
  protected lazy val insert: Parser[LogicalPlan] =
    INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
    }

  /**
    * 查那些列
    */
  protected lazy val projection: Parser[Expression] =
    expression ~ (AS.? ~> ident.?) ^^ {
      case e ~ a => a.fold(e)(Alias(e, _)())
    }

  // Based very loosely on the MySQL Grammar.
  // http://dev.mysql.com/doc/refman/5.0/en/join.html
  /**
    * 会将你的SQL语句里面解析出来的各种token,或者TreeNode,给关联起来,最后组成一颗语法树
    * 语法树封装在LogicalPlan中,但是要注意,此时的LogicalPlan,还是Unresolved LogicalPlan
    */

  protected lazy val relations: Parser[LogicalPlan] =
    ( relation ~ rep1("," ~> relation) ^^ {
        case r1 ~ joins => joins.foldLeft(r1) { case(lhs, r) => Join(lhs, r, Inner, None) } }
    | relation
    )

接下看再看看relations的Join

/**
  * 通过这个Join, left: LogicalPlan,right: LogicalPlan
  * 意思就是说,将SQL语句的各个部分,通过Spark SQL的规则,组合拼装成一个语法树
  */
case class Join(
  left: LogicalPlan,
  right: LogicalPlan,
  joinType: JoinType,
  condition: Option[Expression]) extends BinaryNode {
}

接下来看真真执行sql

  /**
    * 实际上,在后面操作DataFrame的时候,在实际真正要执行SQL语句,对数据进行查询,返回结果的时候,会触发SQLContext的executePlan()方法的执行,
    * 该方法,实际上会返回一个QueryExecution,这个QueryExecution实际上,会触发整个后续的流程
    */
  protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))

  protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)

看这个QueryExecution类
用一个Unresolved LogicalPlan去构造一个QueryExecution的实例对象,那么SQL语句的执行就会一步步触发

@DeveloperApi
  protected[sql] class QueryExecution(val logical: LogicalPlan) {
    def assertAnalyzed(): Unit = checkAnalysis(analyzed)

    // 用一个Unresolved LogicalPlan去构造一个QueryExecution的实例对象,那么SQL语句的执行就会一步步触发

    // Analyzer的apply()方法执行结束后,得到一个Resolved LogicalPlan
    lazy val analyzed: LogicalPlan = analyzer(logical)
    // 会通过CacheManager 执行一个缓存的操作,用一个cacheManager,调用其useCachedData()方法,就是说,如果之前已经缓存过这个执行计划
    // 又再次执行的话,那么,其实可以使用缓存中的数据
    lazy val withCachedData: LogicalPlan = {
      assertAnalyzed
      cacheManager.useCachedData(analyzed)
    }
    // 调用Optimizer的apply()方法,针对Resolved LogicalPlan 调用Optimizer,进行优化,获得Optimized LogicalPlan,获得优化后的逻辑执行计划
    lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)

    // TODO: Don't just pick the first one...
    // 用SparkPlanner,对Optimizer生成的Optimized LogicalPlan,创建一个SparkPlan
    lazy val sparkPlan: SparkPlan = {
      SparkPlan.currentContext.set(self)
      planner(optimizedPlan).next()
    }
    // executedPlan should not be used to initialize any SparkPlan. It should be
    // only used for execution.
    // 使用SparkPlan 生成一个可以执行的SparkPlan,此时就是PhysicalPlan,物理执行计划,直接可以执行了,已经绑定到了数据源,而且
    // 知道对各个表的join,如何进行join,包括join的时候,spark内部会对小表进行广播
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    // 最后一步,调用SparkPlan(封装了PhysicalPlan的SparkPlan)的executor()方法,execute()方法,实际上就会去执行物理执行计划
    // execute()方法返回的是RDD[Row],就是一个元素类型为Row的RDD
    lazy val toRdd: RDD[Row] = executedPlan.execute()

    protected def stringOrError[A](f: => A): String =
      try f.toString catch { case e: Throwable => e.toString }

    def simpleString: String =
      s"""== Physical Plan ==
         |${stringOrError(executedPlan)}
      """.stripMargin.trim

    override def toString: String =
      // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
      // however, the `toRdd` will cause the real execution, which is not what we want.
      // We need to think about how to avoid the side effect.
      s"""== Parsed Logical Plan ==
         |${stringOrError(logical)}
         |== Analyzed Logical Plan ==
         |${stringOrError(analyzed)}
         |== Optimized Logical Plan ==
         |${stringOrError(optimizedPlan)}
         |== Physical Plan ==
         |${stringOrError(executedPlan)}
         |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
         |== RDD ==
      """.stripMargin.trim
  }

首先看analyzer

 /**
    * Analyzer的所在地,QueryExecution实际执行SQL语句的时候,第一步,就是用之前SqlParser解析出来的纯逻辑的封装了语法树的Unresolved LogicalPlan
    * 调用Analyzer的apply()方法,将Unresolved LogicalPlan生成一个Resolved LogicalPlan
    */
  @transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, functionRegistry, caseSensitive = true) {
      override val extendedResolutionRules =
        ExtractPythonUdfs ::
        sources.PreInsertCastAndRename ::
        Nil
    }

Analyzer这个类在org.apache.spark.sql.catalyst.analysis包下

/**
  * Analyzer的父类是RuleExecutor,调用Analyzer的apply()方法,实际上会调用RuleExecutor的apply()方法,并传入一个Unresolved LogicalPlan
  */
class Analyzer(catalog: Catalog,
               registry: FunctionRegistry,
               caseSensitive: Boolean,
               maxIterations: Int = 100)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

看RuleExecutor类的apply()方法

 /**
    * 调用这个apply()方法,做了一些事情,总重要的一件事情,就是将这个LogicalPlan与它要查询的数据源绑定起来,从而让
    * Unresolved LogicalPlan 变成一个Resolved LogicalPlan
    */
  def apply(plan: TreeType): TreeType = {
    var curPlan = plan

    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
}

接着看Optimizer,不关心apply()方法,关注它的优化逻辑
Optimizer在org.apache.spark.sql.catalyst.optimizer

object DefaultOptimizer extends Optimizer {
  // 这里的batches是非常重要的,这里封装了每一个Spark SQL版本中,可以对逻辑执行计划执行的优化策略,在这里,重点理解Optimizer的各种优化策略
  // 这样,才清楚,Spark SQL 内部是如何对我们写的SQL语句进行优化的,我们可以再编写SQL语句的时候,直接用优化策略建议的方式来编写SQL语句,传递给
  // Spark SQL执行的SQL语句,本身就已经是最优的,这样,就可以避免在执行SQL解析的时候,进行大量的Spark SQL内部的优化,这样,在某种程度上,也可以提升性能
  //
  val batches =
    Batch("Combine Limits", FixedPoint(100),
      CombineLimits) ::
      // CombineLimits,就是合并limit语句,比如,你的SQL语句中,有多个limit子句,那么在这里会进行合并,取一个并集就可以了,这样的话
      // 在后面SQL执行的时候,limit就执行一次就好,所以,我们再写SQL的时候,尽量就写一个limit
    Batch("ConstantFolding", FixedPoint(100),
      NullPropagation,// 针对NULL的优化,尽量避免出现null的情况,否则,null是很容易产生数据倾斜的
      ConstantFolding,// 针对常量的优化,在这里直接计算可以获得的常量,所以我们自己对可能出现的常量尽量直接给出
      LikeSimplification, // like的简化优化
      BooleanSimplification,// boolean的简化优化
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions,
      OptimizeIn) ::
    Batch("Decimal Optimizations", FixedPoint(100),
      DecimalAggregates) ::
    Batch("Filter Pushdown", FixedPoint(100),
      UnionPushdown, // 将union下推,意思是和filter pushdown ,就是说将union where这种子句,下推到子查询中进行,尽量早的执行union操作和where
                    // 操作,避免在外层查询,针对大量的数据,执行where操作
      CombineFilters, // 合并filter,就是合并where子句,比如子查询中有针对某个字段的where子句,外层查询中,也有针对同样一个字段的where子句,
                      // 那么此时是可以合并where子句的,只保留一个即可,取并集即可,所以自己写SQL的时候哦,也要注意where的使用,如果针对一个字段,写一次就好
      PushPredicateThroughProject,
      PushPredicateThroughJoin,
      PushPredicateThroughGenerate,
      ColumnPruning) :: // ColumnPruning 列剪裁,就是针对你要查询的列进行剪裁,自己写SQL,如果表中有n个字段,但是你只需要查询一个字段, 那么就用select x from 不要使用select * from
    Batch("LocalRelation", FixedPoint(100),
      ConvertToLocalRelation) :: Nil
}

接下来看看SparkPlanner

 /**
    * 用一些策略,比如说DataSourceStrategy,针对逻辑执行计划,执行进一步的具体化和物化
    */
  protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext: SparkContext = self.sparkContext

    val sqlContext: SQLContext = self

    def codegenEnabled = self.conf.codegenEnabled

    def numPartitions = self.conf.numShufflePartitions

    def strategies: Seq[Strategy] =
      experimental.extraStrategies ++ (
      DataSourceStrategy ::
      DDLStrategy ::
      TakeOrdered ::
      HashAggregation ::
      LeftSemiJoin ::
      HashJoin ::
      InMemoryScans ::
      ParquetOperations ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil)
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容