spark sql 2.3 源码解读 - Preparations (6)

上一章生成的Physical Plan 还需要经过prepareForExecution这一步,做执行前的一些准备工作,代码如下:

/ executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/**
  * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
  * row format conversions as needed.
  */
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
  preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

看到上面的 foldLeft 是不是很熟悉,大家应该明白和前面是一个套路了,就是将规则遍历应用到plan。

直接看preparations,注释也写的很清楚:

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
  python.ExtractPythonUDFs,
  PlanSubqueries(sparkSession),
  EnsureRequirements(sparkSession.sessionState.conf),
  CollapseCodegenStages(sparkSession.sessionState.conf),
  ReuseExchange(sparkSession.sessionState.conf),
  ReuseSubquery(sparkSession.sessionState.conf))

rule并不是很多,其中EnsureRequirements是很重要的,下面着重讲一下:

apply 入口:

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
  // TODO: remove this after we create a physical operator for `RepartitionByExpression`.
  case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
    child.outputPartitioning match {
      case lower: HashPartitioning if upper.semanticEquals(lower) => child
      case _ => operator
    }
  case operator: SparkPlan =>
    // 执行 ensureDistributionAndOrdering
    ensureDistributionAndOrdering(reorderJoinPredicates(operator))
}
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
  val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
  val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
  var children: Seq[SparkPlan] = operator.children
  assert(requiredChildDistributions.length == children.length)
  assert(requiredChildOrderings.length == children.length)

  // Ensure that the operator's children satisfy their output distribution requirements.
  // children的实际输出分布(其实就是partitioning)满足要求的输出分布
  children = children.zip(requiredChildDistributions).map {
   //满足,直接返回
    case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
      child
    // 广播单独处理
    case (child, BroadcastDistribution(mode)) =>
      BroadcastExchangeExec(mode, child)
    case (child, distribution) =>
      val numPartitions = distribution.requiredNumPartitions
        .getOrElse(defaultNumPreShufflePartitions)
      // 做一次shuffle(可以认为是重新分区,改变分布),满足需求。 
      ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
  }

  // Get the indexes of children which have specified distribution requirements and need to have
  // same number of partitions.
  // 过滤有指定分布需求的children
  val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
    case (UnspecifiedDistribution, _) => false
    case (_: BroadcastDistribution, _) => false
    case _ => true
  }.map(_._2)
  // children 的 partition 数量
  val childrenNumPartitions =
    childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet

  if (childrenNumPartitions.size > 1) {
    // Get the number of partitions which is explicitly required by the distributions.
    // children 的分布需要的partition数量
    val requiredNumPartitions = {
      val numPartitionsSet = childrenIndexes.flatMap {
        index => requiredChildDistributions(index).requiredNumPartitions
      }.toSet
      assert(numPartitionsSet.size <= 1,
        s"$operator have incompatible requirements of the number of partitions for its children")
      numPartitionsSet.headOption
    }
    // partition 数量目标
    val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max)
    // 指定分布的children的partition数量全部统一为 targetNumPartitions
    children = children.zip(requiredChildDistributions).zipWithIndex.map {
      case ((child, distribution), index) if childrenIndexes.contains(index) =>
        if (child.outputPartitioning.numPartitions == targetNumPartitions) {
          // 符合目标,直接返回
          child
        } else {
         // 不符合目标,shuffle
          val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
          child match {
            // If child is an exchange, we replace it with a new one having defaultPartitioning.
            case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
            case _ => ShuffleExchangeExec(defaultPartitioning, child)
          }
        }

      case ((child, _), _) => child
    }
  }

  // 添加ExchangeCoordinator,这个单独讲
  // Now, we need to add ExchangeCoordinator if necessary.
  // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
  // However, with the way that we plan the query, we do not have a place where we have a
  // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
  // at here for now.
  // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
  // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
  children = withExchangeCoordinator(children, requiredChildDistributions)

  // 如果有sort的需求,则加上SortExec
  // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
  children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
    // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
    if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
      child
    } else {
      SortExec(requiredOrdering, global = false, child = child)
    }
  }

  operator.withNewChildren(children)
}

withExchangeCoordinator的逻辑如下,他用来调节多个spark plan的数据分布:

/**
 * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
 * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
 */
private def withExchangeCoordinator(
    children: Seq[SparkPlan],
    requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
  // 判断是否需要添加 ExchangeCoordinator
  val supportsCoordinator =
    if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
      // Right now, ExchangeCoordinator only support HashPartitionings.
      children.forall {
       // 条件1 children中有ShuffleExchangeExec且分区为HashPartitioning
        case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
        case child =>
          child.outputPartitioning match {
            case hash: HashPartitioning => true
            case collection: PartitioningCollection =>
              collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
            case _ => false
          }
      }
    } else {
      // In this case, although we do not have Exchange operators, we may still need to
      // shuffle data when we have more than one children because data generated by
      // these children may not be partitioned in the same way.
      // Please see the comment in withCoordinator for more details.
      // 条件2 分布为ClusteredDistribution 或 HashClusteredDistribution
      val supportsDistribution = requiredChildDistributions.forall { dist =>
        dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
      }
      children.length > 1 && supportsDistribution
    }

  val withCoordinator =
    // adaptiveExecutionEnabled 且 符合条件1或2
    if (adaptiveExecutionEnabled && supportsCoordinator) {
      val coordinator =
        new ExchangeCoordinator(
          children.length,
          targetPostShuffleInputSize,
          minNumPostShufflePartitions)
      children.zip(requiredChildDistributions).map {
        case (e: ShuffleExchangeExec, _) =>
          // 条件1, 直接添加 coordinator
          // This child is an Exchange, we need to add the coordinator.
          e.copy(coordinator = Some(coordinator))
        case (child, distribution) =>
          // If this child is not an Exchange, we need to add an Exchange for now.
          // Ideally, we can try to avoid this Exchange. However, when we reach here,
          // there are at least two children operators (because if there is a single child
          // and we can avoid Exchange, supportsCoordinator will be false and we
          // will not reach here.). Although we can make two children have the same number of
          // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
          // For example, let's say we have the following plan
          //         Join
          //         /  \
          //       Agg  Exchange
          //       /      \
          //    Exchange  t2
          //      /
          //     t1
          // In this case, because a post-shuffle partition can include multiple pre-shuffle
          // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
          // after shuffle. So, even we can use the child Exchange operator of the Join to
          // have a number of post-shuffle partitions that matches the number of partitions of
          // Agg, we cannot say these two children are partitioned in the same way.
          // Here is another case
          //         Join
          //         /  \
          //       Agg1  Agg2
          //       /      \
          //   Exchange1  Exchange2
          //       /       \
          //      t1       t2
          // In this case, two Aggs shuffle data with the same column of the join condition.
          // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
          // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
          // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
          // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
          // pre-shuffle partitions by using another partitionStartIndices [0, 4].
          // So, Agg1 and Agg2 are actually not co-partitioned.
          //
          // It will be great to introduce a new Partitioning to represent the post-shuffle
          // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
          // 条件2,这个child的children虽然分区数目一样,但是不一定是同一种分区方式,所以加上coordinator
          val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions)
          assert(targetPartitioning.isInstanceOf[HashPartitioning])
          ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
      }
    } else {
      // If we do not need ExchangeCoordinator, the original children are returned.
      children
    }

  withCoordinator
}

如下所示,adaptiveExecutionEnabled 默认是false的,所以ExchangeCoordinator默认是关闭的,等下一章我们会详细讲解它的执行逻辑,在这里大家先有个印象就好。

`val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")`
    .doc("When true, enable adaptive query execution.")
    .booleanConf
    `.createWithDefault(false)`

总结一下,Preparations 会比较 children的实际输出分布和需求输出分布的不同(比较partitioning),然后添加ShuffleExchangeExec、ExchangeCoordinator、SortExec做转化处理,使其匹配。

下面给一个例子,代码如下:

val df = spark.read.json("examples/src/main/resources/test.json")
df.createOrReplaceTempView("A")
spark.sql("SELECT A.B FROM A ORDER BY A.B").collectAsList()

执行到prepare时断点调试一下:

屏幕快照 2018-08-15 下午3.40.56

requireChildDistribution 是 OrderedDistribution, 但 child的outputPartitioning是null,所以会触发添加ShuffleExchangeExec的逻辑。

OrderedDistribution代码如下,会创建RangePartitioning:

case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
  require(
    ordering != Nil,
    "The ordering expressions of an OrderedDistribution should not be Nil. " +
      "An AllTuples should be used to represent a distribution that only has " +
      "a single partition.")

  override def requiredNumPartitions: Option[Int] = None

  override def createPartitioning(numPartitions: Int): Partitioning = {
    RangePartitioning(ordering, numPartitions)
  }
}

执行前后结果对比,和预期相符合:

执行前:

Sort [B#6 ASC NULLS FIRST], true, 0
+- FileScan json [B#6] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/neal/github/spark2.3/spark/examples/src/main/resources/test.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<B:string>

执行后:

*(2) Sort [B#6 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(B#6 ASC NULLS FIRST, 200)
   +- *(1) FileScan json [B#6] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/Users/neal/github/spark2.3/spark/examples/src/main/resources/test.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<B:string>
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容