【Spark 精选】EliminateOuterJoin 优化规则详解

1.EliminateOuterJoin 优化规则的应用场景

问题:为啥需要消除外链接即 out join
解答:消除 out join 可以提高执行效率。因为 inner join 只保留左表和右表可以关联到的数据,left join 需要保留左表全表的数据,right join 需要保留右表全表的数据,full join 左右表数据都需要保留,所以四种 join 在数据处理上的效率:inner join > left join = right join > full join

2.EliminateOuterJoin 源码解析

  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))

    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

    join.joinType match {
      case RightOuter if leftHasNonNullPredicate => Inner   // 1.right outer类型,且join的左表有过滤操作,则转换为inner类型
      case LeftOuter if rightHasNonNullPredicate => Inner   // 2.left outer类型,且join的右表有过滤操作,则转换为inner类型
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner   // 3.full outer类型,且join的左右表都有过滤操作,则转换为inner类型
      case FullOuter if leftHasNonNullPredicate => LeftOuter   // 4.full outer类型,且join的左表有过滤操作,则转换为left outer类型
      case FullOuter if rightHasNonNullPredicate => RightOuter   // 5.full outer类型,且join的右表有过滤操作,则转换为right outer类型
      case o => o
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // 匹配上Filter并且其子节点为Join的LogicalPlan
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      // 如果相等,则不符合优化条件
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))   // 如果不相等,则改变 JoinType
  }
}
EliminateOuterJoin执行流程.JPG

问题 1:为啥 left out join 的右边有过滤条件,则转换为 inner?
解答left join 的特点是右表没有对应的数据时补 null。如下所示,现在右表有个条件 a<1,这说明右表为 null 都会被 a<1 给过滤掉,此时和 inner join 是等价的。

spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('departments.dept_id < 200)
   +- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
      :- 'UnresolvedRelation [employees], [], false
      +- 'UnresolvedRelation [departments], [], false

== Analyzed Logical Plan ==
emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
+- Filter (dept_id#105 < 200)
   +- Join LeftOuter, (dept_id#104 = dept_id#105)
      :- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
      :  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
      +- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
         +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Optimized Logical Plan ==
Join Inner, (dept_id#104 = dept_id#105)
:- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
:  +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:     +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

问题 2:为啥规则 EliminateOuterJoin 需要在谓语下推 PushDownPredicates 之前执行?

  // Optimizer 
  def defaultBatches: Seq[Batch] = {
    val operatorOptimizationRuleSet =
      Seq(
        // Operator push down
        PushProjectionThroughUnion,
        ReorderJoin,
        EliminateOuterJoin,   // 消除外链接
        PushDownPredicates,   // 谓语下推
        // 省略...
  }      

解答:谓词下推是指尽量将过滤条件更贴近数据源,使得查询过程可以跳过无关的数据。因为 EliminateOuterJoin 需要根据原始过滤条件的位置,进行 out join 转换,如果先执行谓语下推 PushDownPredicates,会影响前者,所以消除外链接的规则需要在谓语下推规则之前执行。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容