Spark2.1和2.2 SQL物理执行策略之Join源码分析以及不同类型Join区分

1. object ExtractEquiJoinKeys

一个模式匹配,官方注释是:

A pattern that finds joins with equality conditions that can be evaluated using equi-join. Null-safe equality will be transformed into equality as joining key (replace null with default value).

那什么叫null-safe equality呢?这里有个case class EqualNullSafe,解释是:

expr1 FUNC expr2 - Returns same result as the EQUAL(=) operator for non-null operands, but returns true if both are null, false if one of the them is null.

意思也就是除了正常的值会判断相等之外,当等式左右两边都是null时候也会认为其相等,当有一边为null时候认为其不等。查看源码会发现,当两边都是null时候其实会被当作是0来处理。

源码:

/**
 * A pattern that finds joins with equality conditions that can be evaluated using equi-join.
 *
 * Null-safe equality will be transformed into equality as joining key (replace null with default
 * value).
 */
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
  /** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
  type ReturnType =
    (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

  def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
    case join @ Join(left, right, joinType, condition) =>
      logDebug(s"Considering join on: $condition")
      // Find equi-join predicates that can be evaluated before the join, and thus can be used
      // as join keys.
      val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
      val joinKeys = predicates.flatMap {
        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
        case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
        case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
        // Replace null with default value for joining key, then those rows with null in it could
        // be joined together
        case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
          Some((Coalesce(Seq(l, Literal.default(l.dataType))),
            Coalesce(Seq(r, Literal.default(r.dataType)))))
        case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
          Some((Coalesce(Seq(r, Literal.default(r.dataType))),
            Coalesce(Seq(l, Literal.default(l.dataType)))))
        case other => None
      }
      val otherPredicates = predicates.filterNot {
        case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => false
        case EqualTo(l, r) =>
          canEvaluate(l, left) && canEvaluate(r, right) ||
            canEvaluate(l, right) && canEvaluate(r, left)
        case other => false
      }

      if (joinKeys.nonEmpty) {
        val (leftKeys, rightKeys) = joinKeys.unzip
        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
        Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
      } else {
        None
      }
    case _ => None
  }
}

首先将join的所有conditions收集出来(如果有and则收集and.left和and.right),然后分成两个Sequence,一个是joinKeys,一个是otherPredicates,前者是对于canEvaluate()为true的收集其(left, right),后者是除了前者收集到的之外的其他condition。那么其中的canEvaluate()是什么,源码如下:

/**
 * Returns true if `expr` can be evaluated using only the output of `plan`.  This method
 * can be used to determine when it is acceptable to move expression evaluation within a query
 * plan.
 *
 * For example consider a join between two relations R(a, b) and S(c, d).
 *
 * - `canEvaluate(EqualTo(a,b), R)` returns `true`
 * - `canEvaluate(EqualTo(a,c), R)` returns `false`
 * - `canEvaluate(Literal(1), R)` returns `true` as literals CAN be evaluated on any plan
 */
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
  expr.references.subsetOf(plan.outputSet)

即左边表达式的字段必须是右边的子集。

满足ExtractEquiJoinKeys模式的case,会被应用到Join的物理策略中来。

2. object JoinSelection

源码:

  /**
   * Select the proper physical plan for join based on joining keys and size of logical plan.
   *
   * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the
   * predicates can be evaluated by matching join keys. If found,  Join implementations are chosen
   * with the following precedence:
   *
   * - Broadcast: if one side of the join has an estimated physical size that is smaller than the
   *     user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold
   *     or if that side has an explicit broadcast hint (e.g. the user applied the
   *     [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side
   *     of the join will be broadcasted and the other side will be streamed, with no shuffling
   *     performed. If both sides of the join are eligible to be broadcasted then the
   * - Shuffle hash join: if the average size of a single partition is small enough to build a hash
   *     table.
   * - Sort merge: if the matching join keys are sortable.
   *
   * If there is no joining keys, Join implementations are chosen with the following precedence:
   * - BroadcastNestedLoopJoin: if one side of the join could be broadcasted
   * - CartesianProduct: for Inner join
   * - BroadcastNestedLoopJoin
   */
  object JoinSelection extends Strategy with PredicateHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      // --- BroadcastHashJoin --------------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBuildRight(joinType) && canBroadcast(right) =>
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBuildLeft(joinType) && canBroadcast(left) =>
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

      // --- ShuffledHashJoin ---------------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
           && muchSmaller(right, left) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
         if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
           && muchSmaller(left, right) ||
           !RowOrdering.isOrderable(leftKeys) =>
        Seq(joins.ShuffledHashJoinExec(
          leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

      // --- SortMergeJoin ------------------------------------------------------------
      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

      // --- Without joining keys ------------------------------------------------------------

      // Pick BroadcastNestedLoopJoin if one side could be broadcasted
      case j @ logical.Join(left, right, joinType, condition)
          if canBuildRight(joinType) && canBroadcast(right) =>
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), BuildRight, joinType, condition) :: Nil
      case j @ logical.Join(left, right, joinType, condition)
          if canBuildLeft(joinType) && canBroadcast(left) =>
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil

      // Pick CartesianProduct for InnerJoin
      case logical.Join(left, right, _: InnerLike, condition) =>
        joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

      case logical.Join(left, right, joinType, condition) =>
        val buildSide =
          if (right.stats(conf).sizeInBytes <= left.stats(conf).sizeInBytes) {
            BuildRight
          } else {
            BuildLeft
          }
        // This join could be very slow or OOM
        joins.BroadcastNestedLoopJoinExec(
          planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

      // --- Cases where this strategy does not apply ---------------------------------------------
      case _ => Nil
    }
  }

首先看注释,该策略会首先使用ExtractEquiJoinKeys来确定join至少有一个谓词是可以去估算的,如果有的话,就要根据这些谓词来去计算选择哪种join,这里分三种Join,广播Join,Shuffle Hash Join,还有最常见的Sort Merge Join。如果没有谓词可以估算的话,那么也是有两种方式:BroadcastNestedLoopJoin和CartesianProduct。

接下来分析源码。在满足有谓词可去估算的情况下,是如何判别是哪种Join?

2.1 Broadcast Join

涉及到两个方法,canBroadcast()canBuildX(canBuildLeft或者canBuildRight)。

/**
 * Matches a plan whose output should be small enough to be used in broadcast join.
 */
private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.stats(conf).hints.isBroadcastable.getOrElse(false) ||
    (plan.stats(conf).sizeInBytes >= 0 &&
      plan.stats(conf).sizeInBytes <= conf.autoBroadcastJoinThreshold)
}

可以看到canBroadcast()这边会去配置项里查找AUTO_BROADCASTJOIN_THRESHOLD,这个配置为-1是不可用。

private def canBuildRight(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
  case j: ExistenceJoin => true
  case _ => false
}

private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
  case _: InnerLike | RightOuter => true
  case _ => false
}

可以看到,canBuildRight()canBuildLeft()方法的意思当以另外一边为主时候为true。

2.2 Shuffled Hash Join

涉及到canBuildLocalHashMap()方法、muchSmaller()方法和一个配置项PREFER_SORTMERGEJOIN,这个配置项的解释是:

When true, prefer sort merge join over shuffle hash join.

canBuildLocalHashMap()方法源码是:

/**
 * Matches a plan whose single partition should be small enough to build a hash table.
 *
 * Note: this assume that the number of partition is fixed, requires additional work if it's
 * dynamic.
 */
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  plan.stats(conf).sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

该方法涉及到了两个配置项,一个是AUTO_BROADCASTJOIN_THRESHOLD,这个配置项在广播Join中已经有使用到,是对于查询优化非常有用的配置,另外一个是SHUFFLE_PARTITIONS,是为了join或者aggregate进行shuffle时的分区数,不配置的话,默认200。

muchSmaller()源码:

/**
 * Returns whether plan a is much smaller (3X) than plan b.
 *
 * The cost to build hash map is higher than sorting, we should only build hash map on a table
 * that is much smaller than other one. Since we does not have the statistic for number of rows,
 * use the size of bytes here as estimation.
 */
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
  a.stats(conf).sizeInBytes * 3 <= b.stats(conf).sizeInBytes
}

也就是所谓的much就是3倍大小。

2.3 Sort Merge Join

收集到的join keys在数据类型上都是可以排序的情况下,可以用Sort Merge Join。

3. BroadcastHashJoinExec

case class BroadcastHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin with CodegenSupport {

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")

    val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
    streamedPlan.execute().mapPartitions { streamedIter =>
      val hashed = broadcastRelation.value.asReadOnlyCopy()
      TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
      join(streamedIter, hashed, numOutputRows)
    }
  }
}
protected def join(
    streamedIter: Iterator[InternalRow],
    hashed: HashedRelation,
    numOutputRows: SQLMetric): Iterator[InternalRow] = {

  val joinedIter = joinType match {
    case _: InnerLike =>
      innerJoin(streamedIter, hashed)
    case LeftOuter | RightOuter =>
      outerJoin(streamedIter, hashed)
    case LeftSemi =>
      semiJoin(streamedIter, hashed)
    case LeftAnti =>
      antiJoin(streamedIter, hashed)
    case j: ExistenceJoin =>
      existenceJoin(streamedIter, hashed)
    case x =>
      throw new IllegalArgumentException(
        s"BroadcastHashJoin should not take $x as the JoinType")
  }

  val resultProj = createResultProjection
  joinedIter.map { r =>
    numOutputRows += 1
    resultProj(r)
  }
}

上述两段代码,第一段是BroadcastHashJoinExec的定义和基本方法,第二段是其继承的HashJoinjoin()方法。在HashJoin中,存在着一些二元对象,命名为(buildXXX, streamedXXX),这里没有打出来可以自行翻源码,比如(buildPlan, streamedPlan),那么在这里,buildXXX是要被Hash或者要被广播的小表,streamedXXX是大表,stream意思就是通过迭代流过去一条条处理的意思(个人理解)。

所以在这边,将buildPlan广播出去以后,将streamedPlan调用execute()过后返回的RDD[InternalRow],调用mapPartitions,根据每个分区和广播的小表进行join操作。

4. ShuffledHashJoinExec

case class ShuffledHashJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    buildSide: BuildSide,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan)
  extends BinaryExecNode with HashJoin {

  private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
    val buildDataSize = longMetric("buildDataSize")
    val buildTime = longMetric("buildTime")
    val start = System.nanoTime()
    val context = TaskContext.get()
    val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
    buildTime += (System.nanoTime() - start) / 1000000
    buildDataSize += relation.estimatedSize
    // This relation is usually used until the end of task.
    context.addTaskCompletionListener(_ => relation.close())
    relation
  }

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
      val hashed = buildHashedRelation(buildIter)
      join(streamIter, hashed, numOutputRows)
    }
  }
}

ShuffledHashJoinBroadcastJoin在构造Hash Table上有不同,后者是依靠广播生成的HashedRelation,前者是调用zipPartitions方法,该方法的作用是将两个有相同分区数的RDD合并,映射参数是两个RDD的迭代器,可以看到在这里是(streamIter, buildIter),然后对buildIter构造HashRelation。这也就说明:BroadcastJoin的HashRelation是小表的全部数据,而ShuffledHashJoin的HashRelation只是小表跟大表在同一分区内的一部分数据

5. SortMergeJoinExec

case class SortMergeJoinExec(
    leftKeys: Seq[Expression],
    rightKeys: Seq[Expression],
    joinType: JoinType,
    condition: Option[Expression],
    left: SparkPlan,
    right: SparkPlan) extends BinaryExecNode with CodegenSupport {

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    val spillThreshold = getSpillThreshold
    left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
      ...
      ...
    }
  }
}

可以看到,同样是将两个RDD做zipPartitions后然后将每个partition迭代做Join。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,179评论 3 15
  • 1、 性能调优 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所...
    Frank_8942阅读 4,535评论 2 36
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,824评论 0 20
  • http://spark.apache.org/docs/latest/api/python/index.html...
    mpro阅读 6,092评论 0 4
  • 喜欢圆珠笔画请关注我 希望有人喜欢❤️
    e236fceff1fb阅读 358评论 3 4