[SPARK][CORE] 面试问题之 Shuffle reader 的细枝末节 (上)

欢迎关注微信公众号“Tim在路上”
之前我们已经了解了shuffle writer的详细过程,那么生成文件后会发生什么呢?以及它们是如何被读取呢?读取是内存的操作吗?这些问题也随之产生,那么今天我们将先来了解了shuffle reader的细枝末节。

在文章Spark Shuffle概述中我们已经知道,在ShuffleManager中不仅定义了getWriter来获取map writer的实现方式, 同时还定义了getReader来获取读取shuffle文件的实现方式。 在Spark中调用有两个调用getReader的抽象类的重要实现,分别是ShuffledRDD和ShuffleRowRDD。前者是与RDD API交互,后面一个是DataSet Api的交互实现。在Spark 3.0后其核心已经变成了Spark SQL,所以我们重点从ShuffleRowRDD调用getReader开始讲起。

从ShuffleRowRDD开始

ShuffleRowRDD主要是被ShuffleExchangeExec调用。这里简单介绍下ShuffleExchangeExec操作算子。它主要负责两件事:首先,准备ShuffleDependency,它根据父节点所需的分区方案对子节点的输出行进行分区。其次,添加一个ShuffleRowRDD并指定准备好的ShuffleDependency作为此RDD的依赖项。


2927.png
class ShuffledRowRDD(
    var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
    metrics: Map[String, SQLMetric],
    partitionSpecs: Array[ShufflePartitionSpec])
  extends RDD[InternalRow](dependency.rdd.context,Nil)

ShuffleRowRDD继承自RDD[InternalRow], 同时内部维护着三个参数,分别是dependency,metrics和partitionSpecs。dependency封装着shuffleIdshuffleHandlenumPartitions 可以基于其判断出shuffleWriter采用了哪种方式。partitionSpecs定义了分区规范的类型。

目前在spark 3.2版本中partitionSpecs的实现类主要有以下四个:

  • CoalescedPartitionSpec用于coalesce shuffle partitions 逻辑规则
  • PartialReducerPartitionSpec参与了 skew join 优化
  • PartialMapperPartitionSpec用于本地随机读取器
  • CoalescedMapperPartitionSpec用于优化本地随机读取器

不同类型的分区规范其实质是代表不同的随机读取的参数。我们都知道在Spark Shuffle中getReader仅有且唯一的一个实现方式, 即BlockStoreShuffleReader 的实现。但是不同的分区规范意味将给共享的reader器传递不同的参数, 下面是ShuffleRowRDD中的简化代码:

// ShuffleRowRDD
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
  val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
  // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
  // as well as the `tempMetrics` for basic shuffle metrics.
  val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
  val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
    // CoalescedPartitionSpec会读取map task为所有reducer所产生的shuffle file
    case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
      SparkEnv.get.shuffleManager.getReader(
        dependency.shuffleHandle,
        startReducerIndex,
        endReducerIndex,
        context,
        sqlMetricsReporter)
   // PartialReducerPartitionSpec 读取map task为一个reducer产生的部分数据
    case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
      SparkEnv.get.shuffleManager.getReader(
        dependency.shuffleHandle,
        startMapIndex,
        endMapIndex,
        reducerIndex,
        reducerIndex + 1,
        context,
        sqlMetricsReporter)
   // PartialMapperPartitionSpec读取shuffle map文件的部分
   case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
        SparkEnv.get.shuffleManager.getReader(
          dependency.shuffleHandle,
          mapIndex,
          mapIndex + 1,
          startReducerIndex,
          endReducerIndex,
          context,
          sqlMetricsReporter)
...
    reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
  }

其实从上面传的参数中就可以看出点端倪CoalescedPartitionSpec(startReducerIndex,endReducer-Index) 读取map task为所有reducer所产生的shuffle file;PartialReducerPartitionSpec(startMap-Index, endMapIndex,reducerIndex,reducerIndex + 1) 可以看出每次读取只会为一个reducer读取部分数据。

从上面代码可以看出ShuffleRowRDD 使用 read() 方法遍历 shuffle 数据并将其返回给客户端,那么接下来我们就详细的看下getReader是如何实现的?

ShuffleReader调用前的准备

SortShuffleManager是ShuffleManager的唯一实现,里面也实现getReader方法,那么就让我们从getReader开始。

override def getReader[K, C](
    handle: ShuffleHandle,
    startMapIndex: Int,
    endMapIndex: Int,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
  val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
  val (blocksByAddress, canEnableBatchFetch) =
    // 是否开启了push-based shuffle, 后续再分享,这里先跳过
    if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
      val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (res.iter, res.enableBatchFetch)
    } else {
      // [1] 使用mapOutputTracker获取shuffle块的位置
      val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (address, true)
    }
  // [2] 创建一个BlockStoreShuffleReader实例,该实例将负责将shuffle文件从mapper传递到 reducer 任务
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
    shouldBatchFetch =
      canEnableBatchFetch &&canUseBatchFetch(startPartition, endPartition, context))
}

可以看到getReader主要做了两件事:

  • [1] 使用mapOutputTracker获取shuffle块的位置
  • [2] 创建一个BlockStoreShuffleReader实例,该实例将负责将shuffle文件从mapper传递到reducer 任务

那么Spark中如何保存和获取shuffle块的位置呢?

在spark中有两种mapOutputTracker,两种mapOutputTracker 都是在创建SparkEnv时创建。

其中第一个是MapOutputTrackerMaster,它驻留在驱动程序中并跟踪每个阶段的map output输出, 并与DAGScheduler进行通信。

另一个是MapOutputTrackerWorker,位于执行器上,它负责从MapOutputTrackerMaster获取shuffle 元数据信息。

MapOutputTrackerMaster:

  1. DAGScheduler在创建 shuffle map 阶段时会调用registerShuffle方法,从下面的代码可以看出在创建ShuffleMapStage会调用registerShuffle,其实质是在向 shuffleStatuses 映射器中放入shuffleid, 并为其值创建一个新的new ShuffleStatus(numMaps)。
def createShuffleMapStage[K, V, C](
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  ...
  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)

  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // 在创建ShuffleMapStage会调用registerShuffle
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
      shuffleDep.partitioner.numPartitions)
  }
  stage
}

def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
    if (pushBasedShuffleEnabled) {
      if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) {
        throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
      }
    } else {
      // 可以看到其实质是在向 shuffleStatuses 放入shuffleid, 创建ShuffleStatus
      if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps)).isDefined) {
        throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
      }
    }
  }

  1. 到目前位置master tracker存放了一个shuffleid, 表明DAG中存在一个shuffle, 但还是不知道map output file的具体位置。
// DAGScheduler中
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {

  case smt: ShuffleMapTask =>
     val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
     ...
     mapOutputTracker.registerMapOutput(
        shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
  }

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
    shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}

从上面代码可以看出,在每次 shuffle map 阶段的任务终止时,DAGScheduler都会向MapOutputTrackerMaster发送状态更新。跟踪器将有关特定 shuffle 文件的位置和大小的信息添加到在注册步骤中初始化 的shuffleStatuses map中。


3tled.png

MapOutputTrackerWorker:

当worker tracker 没有缓存shuffle信息, 这时就必须发送GetMapOutputStatuses消息来从master tracker获取它。

再回过头来看看,在getReader中通过mapOutputTracker获取shuffle块的位置的方法。

// mapOutTracker
private def getMapSizesByExecutorIdImpl(
    shuffleId: Int,
    startMapIndex: Int,
    endMapIndex: Int,
    startPartition: Int,
    endPartition: Int,
    useMergeResult: Boolean): MapSizesByExecutorId = {
  logDebug(s"Fetching outputs for shuffle$shuffleId")
  // [1] 获取mapOutputStatuses
  val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf,
    // EnableBatchFetch can be set to false during stage retry when the
    // shuffleDependency.shuffleMergeEnabled is set to false, and Driver
    // has already collected the mergedStatus for its shuffle dependency.
    // In this case, boolean check helps to insure that the unnecessary
    // mergeStatus won't be fetched, thus mergedOutputStatuses won't be
    // passed to convertMapStatuses. See details in [SPARK-37023].
    if (useMergeResult)fetchMergeResultelse false)
  ...
}

从上面可以看出获取具体的map output 位置的实现在getStatuses方法中。下面我们来具体分析下:

private def getStatuses(
    shuffleId: Int,
    conf: SparkConf,
    canFetchMergeResult: Boolean): (Array[MapStatus], Array[MergeStatus]) = {
  // push-based shuffle 开启,获取MergeStatus, 现暂不考虑
  if (canFetchMergeResult) {
    ...
  } else {
    val statuses = mapStatuses.get(shuffleId).orNull
    // [1] 如果mapStatuses不包含statuses, 就向master tracker发送GetMapOutputStatuses消息
    if (statuses == null) {
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      val startTimeNs = System.nanoTime()
fetchingLock.withLock(shuffleId) {
        var fetchedStatuses =mapStatuses.get(shuffleId).orNull
        if (fetchedStatuses == null) {
          logInfo("Doing the fetch; tracker endpoint = " +trackerEndpoint)
          val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
          try {
            fetchedStatuses =
              MapOutputTracker.deserializeOutputStatuses[MapStatus](fetchedBytes, conf)
          } catch {
            ...
          }
          logInfo("Got the map output locations")
          mapStatuses.put(shuffleId, fetchedStatuses)
        }
        (fetchedStatuses, null)
      }
    // [2] 如果mapStatuses包含statuses, 直接返回
    } else {
      (statuses, null)
    }
  }
}

从getStatuses可以看出:

  • [1] 如果mapStatuses不包含statuses, 就向master tracker发送GetMapOutputStatuses消息
  • [2] 如果mapStatuses包含statuses, 直接返回
private[spark] sealed trait MapStatus extends ShuffleOutputStatus {
  def location: BlockManagerId

  def updateLocation(newLoc: BlockManagerId): Unit

  def getSizeForBlock(reduceId: Int): Long

  def mapId: Long
}

可见MapStatus中包含了location, mapId等信息。

最后,回到getReader方法中,通过SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId获取shuffle块信息后,再将其作为 shuffle 块的及其物理位置传递给BlockStoreShuffleReader。

那么接下来就我们再来分析下BlockStoreShuffleReader的实现

为避免冗长将BlockStoreShuffleReader放到下一讲进行分析。
欢迎关注微信公众号“Tim在路上”

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容