当一条 sql 语句被 SparkSqlParser 解析为一个 unresolved logicalPlan 后,接下来就会使用 Analyzer 进行 resolve。所谓的 resolve 也就是在未解析的 db、table、function、partition 等对应的 node 上应用一条条 Rule(规则)来替换为新的 node,应用 Rule 的过程中往往会访问 catalog 来获取相应的信息。
先来看看在解析过程中涉及到的几个主要类,以便为之后的详细分析做好铺垫。
一、主要类
上图(省略了诸多成员,方法)列举了解析一个 unresolved logicalPlan 时涉及的主要类及其之间的关系,其中 Analyzer 是解析的入口,其定义如下:
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis
先来看看几个主要的相关的类
1.1、SessionCatalog
SparkSession 使用的 catalog,是 spark 与底层 megastore(比如 Hive Metastore)的桥梁,并管理着 SparkSession 的临时表、view 以及函数。由于会有并发访问,该类是线程安全的。
如上图中该类的构造函数,该类借助 ExternalCatalog、GlobalTempViewManager、FunctionRegistry、FunctionResourceLoader 等类型的成员实现对 db、table、partition、function 的 CURD 等功能
1.1.1、ExternalCatalog
catalog 接口,包含 functions、partitions、tables 和 databases。仅适用于非临时的项目,线程安全。这是一个用来与外部系统交互的 external catalog(比如与 Hive Megastore 交互的实现是 HiveExternalCatalog
,你也可以实现自己的 meta store 及相应的 ExternalCatalog)。当 database 不存在的时候,要抛出 NoSuchDatabaseException
。主要包含以下几类方法:
- database 相关:checkExists、create、alter、list、drop、alter、
use database
- table 相关:checkExists、create、alter、list、drop、alter schema、
rename
- partition 相关:load(加载数据到一个 partition)、load 动态分区、create、drop、rename、alter、get、list
- function 相关:create、drop、alter、rename、checkExists、list
1.1.2、GlobalTempViewManager
一个线程安全的全局的 temp views 的 manager,提供对其原子的操作,比如 create、update、remove 等。注意,view 的名字是大小写敏感的。其包含对于 temp view 的方法:
- get
- create
- update
- remove
- rename
- list
- clear
1.1.3、FunctionRegistry
Analyzer 用来查找 UDF 的 catalog,线程安全并且对 db name 大小写敏感。包含 function 相关的方法:
- register
- create or replace
- look up
- list
- drop
- checkExists
- clear
1.1.4、FunctionResourceLoader
用来加载一个函数要使用的资源
1.2、RuleExecutor
定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。具体是怎么做的,会在下面详细展开。
1.3、SQLConf
用来 get、set SQL 相关的配置、参数。其伴生 object 包含了 spark sql 的所有参数及其类型、说明、默认值。而 class SQLConf 提供了这些参数、配置的 getter、setter 方法。
1.4、CheckAnalysis
用于对 plan 做一些解析,如果解析失败则抛出用户层面的错误
二、如何解析
整个解析过程就是 Analyzer 通过继承或者包含实例的方式将这些类串起来,去 catalog 中查询信息并应用一系列规则来将一个 unresolved logicalplan 最终转变为一个新的 resolved plan 的过程。
2.1、规则是如何执行的?
2.1.1、Rule
在说明规则是如何执行之前,先说明什么是规则?规则均继承了 abstract class Rule
,包含了一个 name 方法及 def apply(plan: TreeType): TreeType
方法,调用 apply 方法将一个 plan 转换成一个新的 plan,这个新的 plan 往往与原来的 plan 有一些不同,也有可能与执行规则前相同。
2.1.2、RuleExecutor
要把一个 unresolved logicalPlan 解析为一个 resolved logicalPlan,需要执行大量规则。那么,这么多规则是如何组织的?执行顺序是怎么样的?这些问题都能在 RuleExecutor 类中找到答案。
2.1.2.1、Batch
类 RuleExecutor 看名字就知道是用来 execute rule 的。在其内部定义了一个 Batch 类,用来表示 a batch of rules
,即一组同类的不定长规则:
case class Batch(name: String,
strategy: Strategy,
rules: Rule[TreeType]*)
其中,strategy: Strategy
即规则的执行策略,表示 Batch 最大执行次数。 如果执行了 maxIterations 次之前达到收敛点(在这里是执行规则后 plan 没有变化),也将停止,不再继续执行 Batch。而每个 Batch 的 maxIterations 都是经验值。
RuleExecutor 包含了一个 protected def batches: Seq[Batch]
方法,用来获取一系列 Batch,这些 Batch 都会在 execute 中执行。所有继承 RuleExecutor(Analyzer 和 Optimizer)都必须实现该方法,即提供自己的 Seq[Batch]。如果需要新增规则,只需要新增 Batch 或者再某个 Batch 中新增规则即可。整体的框架不用动。
2.1.2.2、RuleExecutor#execute
让我们来看看 Batch 和 rule 具体是怎么执行的,即 RuleExecutor#execute(plan: TreeType): TreeType
的逻辑:
有几个关键点:
- Batch 都是连续执行的
- Batch 中的 rules 也是连续执行的
- 当 Batch 执行的次数达到其规定的最大执行次数或执行该 Batch 并未修改 plan,则不再继续运行该 batch
三、Analyzer 的 Seq[Batch]
Analyzer 的 Seq[Batch] 如下:
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveTimeZone(conf) ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)