Spark SQL 架构
Spark SQL 的整体架构如下图所示Spark SQL Catalyst
从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作
- Parser 解析 SQL,生成 Unresolved Logical Plan
- 由 Analyzer 结合 Catalog 信息生成 Resolved Logical Plan
- Optimizer根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
- Query Planner 将 Optimized Logical Plan 转换成多个 Physical Plan
- CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
- Spark 以 DAG 的方法执行上述 Physical Plan
- 在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率
Parser
Spark SQL 使用 Antlr 进行记法和语法解析,并生成 UnresolvedPlan。
当用户使用 SparkSession.sql(sqlText : String) 提交 SQL 时,SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法。该方法分两步
- 使用 Antlr 生成的 SqlBaseLexer 对 SQL 进行词法分析,生成 CommonTokenStream
- 使用 Antlr 生成的 SqlBaseParser 进行语法分析,得到 LogicalPlan
现在两张表,对其进行关联查询如下:
CREATE TABLE score (
id INT,
math_score INT,
english_score INT
)
CREATE TABLE people (
id INT,
age INT,
name INT
)
//关联查询
SELECT sum(v)
FROM (
SELECT score.id,
100 + 80 + score.math_score + score.english_score AS v
FROM people
JOIN score
ON people.id = score.id
AND people.age > 10
) tmp
在sql方法里运行,执行流程如下:
def sql(sqlText: String): DataFrame = {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
sessionState.sqlParser.parsePlan(sqlText)
}
Dataset.ofRows(self, plan, tracker)
}
SparkSession 最终会调用 SparkSqlParser 的 parsePlan 方法,具体流程如下:
// 先看SessionState,是通过instantiateSessionState方法初始化
lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v)}
state
}
}
// 看下sessionStateClassName()的实现
private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
}
}
// 这里我们考虑CATALOG_IMPLEMENTATION为hive这种情况。
// 看下instantiateSessionState()的实现
private def instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState = {
try {
// invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
// className是org.apache.spark.sql.hive.HiveSessionStateBuilder,通过反射调用它的build()方法
// 如下为build()的实现
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}
}
// 看下sqlParser,它的值为
protected lazy val sqlParser: ParserInterface = {
extensions.buildParser(session, new SparkSqlParser(conf))
}
接着我们来看parsePlan()方法
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}
// 在parse()方法里,进行了语法和词法分析
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
// SqlBaseLexer 对 SQL 进行词法分析
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
lexer.ansi = SQLConf.get.ansiParserEnabled
val tokenStream = new CommonTokenStream(lexer)
//SqlBaseParser 进行语法分析
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced
parser.ansi = SQLConf.get.ansiParserEnabled
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()
// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
···
}
生成的 UnresolvedPlan 如下图所示。Spark SQL Parser
从上图可见
- 查询涉及的两张表,被解析成了两个 UnresolvedRelation,也即只知道这是两张表,却并不知道它们是 EXTERNAL TABLE 还是 MANAGED TABLE,也不知道它们的数据存在哪儿,更不知道它们的表结构如何
- sum(v) 的结果未命名
- Project 部分只知道是选择出了属性,却并不知道这些属性属于哪张表,更不知道其数据类型
- Filter 部分也不知道数据类型
Spark SQL 解析出的 UnresolvedPlan 如下所示
== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias tmp
+- 'Project ['score.id, (((100 + 80) + 'score.math_score) + 'score.english_score) AS v#493]
+- 'Filter (('people.id = 'score.id) && ('people.age > 10))
+- 'Join Inner
:- 'UnresolvedRelation `people`
+- 'UnresolvedRelation `score`