1.EliminateOuterJoin 优化规则的应用场景
问题:为啥需要消除外链接即 out join
解答:消除 out join 可以提高执行效率。因为 inner join 只保留左表和右表可以关联到的数据,left join 需要保留左表全表的数据,right join 需要保留右表全表的数据,full join 左右表数据都需要保留,所以四种 join 在数据处理上的效率:inner join > left join = right join > full join
2.EliminateOuterJoin 源码解析
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner // 1.right outer类型,且join的左表有过滤操作,则转换为inner类型
case LeftOuter if rightHasNonNullPredicate => Inner // 2.left outer类型,且join的右表有过滤操作,则转换为inner类型
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner // 3.full outer类型,且join的左右表都有过滤操作,则转换为inner类型
case FullOuter if leftHasNonNullPredicate => LeftOuter // 4.full outer类型,且join的左表有过滤操作,则转换为left outer类型
case FullOuter if rightHasNonNullPredicate => RightOuter // 5.full outer类型,且join的右表有过滤操作,则转换为right outer类型
case o => o
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// 匹配上Filter并且其子节点为Join的LogicalPlan
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
val newJoinType = buildNewJoinType(f, j)
// 如果相等,则不符合优化条件
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) // 如果不相等,则改变 JoinType
}
}
问题 1:为啥 left out join 的右边有过滤条件,则转换为 inner?
解答:left join 的特点是右表没有对应的数据时补 null。如下所示,现在右表有个条件 a<1,这说明右表为 null 都会被 a<1 给过滤掉,此时和 inner join 是等价的。
spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('departments.dept_id < 200)
+- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
:- 'UnresolvedRelation [employees], [], false
+- 'UnresolvedRelation [departments], [], false
== Analyzed Logical Plan ==
emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
+- Filter (dept_id#105 < 200)
+- Join LeftOuter, (dept_id#104 = dept_id#105)
:- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
: +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
+- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
== Optimized Logical Plan ==
Join Inner, (dept_id#104 = dept_id#105)
:- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
: +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
+- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
: +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
: +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
+- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
问题 2:为啥规则 EliminateOuterJoin 需要在谓语下推 PushDownPredicates 之前执行?
// Optimizer
def defaultBatches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin, // 消除外链接
PushDownPredicates, // 谓语下推
// 省略...
}
解答:谓词下推是指尽量将过滤条件更贴近数据源,使得查询过程可以跳过无关的数据。因为 EliminateOuterJoin 需要根据原始过滤条件的位置,进行 out join 转换,如果先执行谓语下推 PushDownPredicates,会影响前者,所以消除外链接的规则需要在谓语下推规则之前执行。