由一条SQL分析SparkSQL执行过程(三)

对于下面一段SQL

SELECT a.uid,b.name,SUM(clk_pv) AS clk_pv 
FROM log  a
JOIN user b ON  a.uid = b.uid
WHERE a.fr = 'android'
GROUP BY a.uid,b.name

在由一条SQL分析SparkSQL执行过程(二)中,我们分析到Spark如何封装SessionState,使得用户只需要通过SparkSQL入口SparkSession就能很容易的做分布式运算。
其大致的思路是:

  1. SparkSession是一个伴生对象。在SparkSession对象中包含一个builder,用户通过Builder配置自己的SparkSession
  2. SparkSession类初始化需传入参数SessionState。Builder在创建SparkSession实例时会通过读取配置,反射加载的方式创建SessionState。
  3. SessionState的初始化过程,是解析器,分析器,CataLog,优化器,Planner,查询执行的创建过程
  4. 后续的SQL操作实际上是调用SessionState的几个组件做对应操作。

在由一条SQL分析SparkSQL执行过程(一)中,我们分析了SQL执行的过程实际是对解析器解析的AST(抽象语法树)遍历使用规则,将一棵抽象语法树转化为另外一棵树的过程。

那么Spark如何实现的呢?

1. 解析器

image.png

对于上图,
首先从类的功能和结构上,我们根据代码追根溯源,

  1. SparkSession底层初始化SessionState。
  2. SessionState通过SessionState的Builder构造。
  3. SessionStateBuidler是BaseSessionStateBuilder的子类,只对其中一个方法重写
  4. 在BaseSessionStateBuilder中完成解析器,分析器等组件的构造
    则还是上部分我们的结论,但是在解析器是如何构造的呢?

在BaseSessionStateBuilder中,我们看到

  /**
   * Session extensions defined in the [[SparkSession]].
   */
  protected def extensions: SparkSessionExtensions = session.extensions

  protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }
  1. 用户在创建SparkSession时,通过SparkSession的Builder的withExtensions可以指明extensions。
  2. extensions 是一个SparkSessionExtensions类型
    在Spark源码中,有这样一个例子说明如何构造外部的扩展
   SparkSession.builder()
   .master("...")
   .conf("...", true)
   .withExtensions { extensions =>
       extensions.injectResolutionRule { session =>
         ...
       }
      extensions.injectParser { (session, parser) =>
        ...
       }
     }
     .getOrCreate()
  }}}

在SparkSessionExtensions中构造解析器的方式如下:
1). 声明一个函数类型(Type),将SparkSession和当前解析接口转化为另一个解析接口
2). 构造一个空的可变的类型Buffer
3). 将外部的解析规则委托给底层解析器,并返回ParserBuilder类型

//声明一个函数类型(Type)
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
//构造一个可变的类型Buffer
 private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder]
//合并外部的解析规则和自有的规则,
 private[sql] def buildParser(
      session: SparkSession,
      initial: ParserInterface): ParserInterface = {
    parserBuilders.foldLeft(initial) { (parser, builder) =>
      builder(session, parser)
    }
  }
def injectParser(builder: ParserBuilder): Unit = {
    parserBuilders += builder
  }

这段逻辑写的是这么绕。。。只能贴一下注释,以及注释的谷歌翻译

Inject a custom parser into the [[SparkSession]]. Note that the builder is passed a session and an initial parser. The latter allows for a user to create a partial parser and to delegate to the underlying parser for completeness. If a user injects more parsers, then the parsers are stacked on top of each other.
将自定义分析器注入到[[SparkSession]]中。请注意,构建器会传递一个会话和一个初始解析器。后者允许用户创建一个部分解析器并为了完整性委托给底层解析器。如果用户注入更多的解析器,那么解析器会堆叠在一起。

  1. 如果用户不添加外部的解析器,相当于new SparkSqlParser()
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
  val astBuilder = new SparkSqlAstBuilder(conf)

  private val substitutor = new VariableSubstitution(conf)

  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    super.parse(substitutor.substitute(command))(toResult)
  }
}

实际是通过SparkSqlAstBuilder 构造一个astBuilder.并重写父类AbstractSqlParser 的parse方法。父类是一个ParseInferface类型。

  1. SparkSqlAstBuilder 是将ANTLR ParseTree 转化为LogicalPlan/Expression/TableIdentifier.

  2. SparkSqlAstBuilder 继承自AstBuilder包含常见的命令解析如create Database,show Database 等,这是因为AstBuilder继承自SqlBaseBaseVisitor。SqlBaseBaseVisitor实现了数据库常见的操作。

总结
SparkSQL是通过AntlrV4这个开源解析框架解析的http://www.antlr.org/.
在使用的时候,做了几层抽象和封装。

  • 构造这模式抽象AstBuilder,将AntlrV4的SQL语法实现细节封装
  • 封装SparksqlParser ,并使用构造这模式,封装SparkSqlAstbuilder继承AstBuilder,那么AntlrV4的特性就都能被SparkSqlParser使用。
  • 为了兼容用户自定义的解析器,解析器定定一个为ParseInferface接口类型。用户可以通过SparkSession注入自己的解析器,Spark底层会将用户的解析器和SPark的解析器做合并和统一,并保证完整性。

回到最开始的SQL:

sessionState.sqlParser.parsePlan(sqlText)

实际调用的是SparkSqlParser父类AbstractSqlParser(继承ParserInterface)的方法:

  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

实际使用的是Ast的visitSingleStatement方法解析单条SQL语句。

2. 分析器

image.png

语法规则千千万,怎么有效的组织起来呢?观察Spark的代码,我们发现就会像这样的问题:

  1. 规则检查是对解析器解析的逻辑计划做检查(包括CataLog检查)
  2. 逻辑计划是一颗树,按照递归的思维,每个节点都是一个逻辑计划,需进心规则检查
  3. 待验证的规则是可以穷举的,比如关系,子查询,CTE查询
  4. 规则检测的过程,就是对指定的规则列表,递归遍历逻辑计划验证是否满足规则条件。
    所以就有下面的抽象:
    一批规则---->规则<--- 具体规则
    Batch-------->Rule<----------ResolveRelations(例子)

规则检查的过程
规则执行器RuleExecutor:待检查的一批规则以及规则检查运行的策略
Strangy(Once,fixedPoint)

一个分析器,就是一个RuleExecutor,包括一批规则以及执行规则的策略。

一份不完整的规则列表


image.png

分析器
分析器是按照指定的规则对逻辑计划做检查。Spark的做法是将规则封装成Batch,将规则的检查封装成规则执行器,一个规则执行器包括待检查的逻辑计划和检查的规则Batch。一旦调用Analysis方法,就会执行规则执行器的execute方法根据Batch规则对逻辑计划做检查。

3. 优化器

优化器和分析器的思路是一样的,只不过,这时候规则是按照预先设定的优化规则对解析和分析过的逻辑计划做进一步变换。比如子查询公用等。
一份不完整的优化规则列表


image.png

4. Planner

image.png

Planner 是一个转化器,将优化后的逻辑计划转换为物理计划。
因此,我们有以下的结论

  1. 首先定一个物理计划SparkPlan.物理几乎是从逻辑计划转化而来,所以物理计划也是一棵树。所以SparkPlan是目标的物理执行计划。
  2. 再次需有一个转化器,就是SparkPlaner,负责将逻辑计划转化为物理计划
  3. 转化的思路是:对逻辑计划树递归,通过Scala模式匹配的方式找到物理执行计划,再返回最好的物理计划(目前是返回第一个)。
  4. 转化的过程叫做Strategy。

Spark 在SparkPlanner中定义的一组策略

  def strategies: Seq[Strategy] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy(conf) ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)
  1. SparkPlanner 本身是SparkStrategies的子类。SparkStrategies类定义了所有的逻辑计划到物理计划的策略。比如聚合操作,投影,连接等。
  2. 一个SparkStrategy 又是GenericStrategy[SparkPlan] 的子类。也就是说一个strategy有两个动作,planLater和apply
    planLater可以用来执行,apply将逻辑计划转成物理计划。
case class PlanLater(plan: LogicalPlan) extends LeafExecNode {

  override def output: Seq[Attribute] = plan.output

  protected override def doExecute(): RDD[InternalRow] = {
    throw new UnsupportedOperationException()
  }
}

举一个Limit的例子:

  1. 将逻辑计划转化 物理计划
 object SpecialLimits extends Strategy {
    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case logical.ReturnAnswer(rootPlan) => rootPlan match {
        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
          execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
        case logical.Limit(
            IntegerLiteral(limit),
            logical.Project(projectList, logical.Sort(order, true, child))) =>
          execution.TakeOrderedAndProjectExec(
            limit, order, projectList, planLater(child)) :: Nil
        case logical.Limit(IntegerLiteral(limit), child) =>
          execution.CollectLimitExec(limit, planLater(child)) :: Nil
        case other => planLater(other) :: Nil
      }
      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
        execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
      case logical.Limit(
          IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
        execution.TakeOrderedAndProjectExec(
          limit, order, projectList, planLater(child)) :: Nil
      case _ => Nil
    }
  }

  1. 匹配到对应的物理计划类,转化为执行RDD运算doExecute
case class TakeOrderedAndProjectExec(
    limit: Int,
    sortOrder: Seq[SortOrder],
    projectList: Seq[NamedExpression],
    child: SparkPlan) extends UnaryExecNode {

  override def output: Seq[Attribute] = {
    projectList.map(_.toAttribute)
  }

  override def executeCollect(): Array[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
    if (projectList != child.output) {
      val proj = UnsafeProjection.create(projectList, child.output)
      data.map(r => proj(r).copy())
    } else {
      data
    }
  }

  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

  protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
      child.execute().map(_.copy()).mapPartitions { iter =>
        org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
      }
    }
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        localTopK, child.output, SinglePartition, serializer))
    shuffled.mapPartitions { iter =>
      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        topK.map(r => proj(r))
      } else {
        topK
      }
    }
  }

一旦查询执行调用executePlan.execute方法时,就会触发执行对应物理计划的doExecute,执行RDD运算。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容