[SPARK][CORE] 面试问题之 BypassMergeSortShuffleWriter的细节

欢迎关注公众号 “Tim在路上”
BypassMergeSortShuffleWriter 就如其名,旁支的sort-baesd Shuffle, 他是采用Hash-style实现的Sort based Shuffle。在map阶段records会按分区写入不同的文件, 一个分区一个文件。然后链接这些分区文件形成一个output文件,并生成其index。reducer通过IndexShuffleBlockResolver 查找消费输出文件的不同分区。

BypassMergeSortShuffleWriter 中records是不会缓存在内存中,所有的records最终都会被flush到磁盘。

在写入时,BypassMergeSortShuffleWriter 会同时为所有的分区打开单独的序列化器和文件流,所以当reduce分区数量特别大的时候性能会非常低下。

ShuffleWriter 的调用是在ShuffleMapTask的runTask中进行调用,每个mapTask 都会调用一次runTask。

BypassMergeSortShuffleWriter 源码解析

首先,我们来回顾下ShuffleWriter的过程。Shuffle发生与宽依赖的stage间,由于stage内的计算采用pipeline的方式。shuffle发生的上一个stage为map节点,下游的stage为reduce阶段。而shuffle写的过程就发生在map阶段,shuffleWriter的调用主要是在ShuffleMapStage中,每个ShuffleMapStage包含多个ShuffleMapTask, mapTask个数和分区数相关。

这样每个ShuffleMapTask都会在其runTask调用下Writer接口,其并非直接调用到具体的执行类。而是在划分宽依赖时想ShuffleManage注册shuffle时,返回的ShuffleHandler决定的。

在ShuffleMapTask调用Writer时,是先调用了ShuffleWriteProcessor ,主要控制了ShuffleWriter的生命周期。下面我们看下ShuffleWriteProcessor 中的Write的实现:

// ShuffleWriteProcessor
def write(
    rdd: RDD[_],
    dep: ShuffleDependency[_, _, _],
    mapId: Long,
    context: TaskContext,
    partition: Partition): MapStatus = {
  var writer: ShuffleWriter[Any, Any] = null
  try {
    // [1] 通过SparkEnv获取ShuffleManager, 并通过dep的shuffleHandle, 获取对应的shuffleWriter的具体实现。
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
      dep.shuffleHandle,
      mapId,
      context,
      createMetricsReporter(context))
    // [2] 调用shuffleWriter的write方法, 并将当前rdd的迭代器传入
    writer.write(
      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    // [3] shuffleWriter结束后,返回mapStatus,或清空数据
    val mapStatus = writer.stop(success = true)
    // [4] 如果shuffleWriter执行成功,初始化push-based shuffle, 后面再细讲
    if (mapStatus.isDefined) {
      // Initiate shuffle push process if push based shuffle is enabled
      // The map task only takes care of converting the shuffle data file into multiple
      // block push requests. It delegates pushing the blocks to a different thread-pool -
      // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
      if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
        manager.shuffleBlockResolver match {
          case resolver: IndexShuffleBlockResolver =>
            val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
            new ShuffleBlockPusher(SparkEnv.get.conf)
              .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
          case _ =>
        }
      }
    }
    mapStatus.get
  }
...
}

ShuffleWriteProcessor 中主要做了三件事:

  • [1] 通过SparkEnv获取ShuffleManager, 并通过dep的shuffleHandle, 获取对应的shuffleWriter的具体实现。
  • [2] 调用shuffleWriter的write方法, 并将当前rdd的迭代器传入
  • [3] shuffleWriter结束后,返回mapStatus,或清空数据

可见每一个ShuffleMapTask执行结束后,就会返回一个mapStatus。Task 结果被封装成 CompletionEvent发送到Driver DAG Scheduler 。判断Task的类型是ShuffleMapTask会DagScheduler 会向 MapOutputTracker 注册 MapOutput status 信息。

那么map中的数据是如何通过BypassMergeSortShuffleWriter写入的?

// BypassMergeSortShuffleWriter
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  // [1] 创建处理mapTask所有分区数据commit提交writer
  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
      .createMapOutputWriter(shuffleId, mapId, numPartitions);
  try {
    // 如果没有数据,直接提交所有分区的commit, 并返回分区长度,获取mapStatus
    if (!records.hasNext()) {
      partitionLengths = mapOutputWriter.commitAllPartitions(
        ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
      mapStatus = MapStatus$.MODULE$.apply(
        blockManager.shuffleServerId(), partitionLengths, mapId);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    // [2] 为每个分区创建一个DiskBlockObjectWriter写入流和FileSegment文件段
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      // [2.1] 每个分区创建个临时file和blockid, 并生成维护一个写入流
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
          blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      DiskBlockObjectWriter writer =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
      if (partitionChecksums.length > 0) {
        writer.setChecksum(partitionChecksums[i]);
      }
      partitionWriters[i] = writer;
    } 
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
    // [3] 依次将records写入到对应分区的写入流中, 并提交
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    // [3.1]依次对每个分区提交和flush写入流
    for (int i = 0; i < numPartitions; i++) {
      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
        partitionWriterSegments[i] = writer.commitAndGet();
      }
    }
    // [4] 遍历所有分区的FileSegement, 并将其链接为一个文件,同时会调用writeMetadataFileAndCommit,为其生成索引文件
    partitionLengths = writePartitionedData(mapOutputWriter);
    mapStatus = MapStatus$.MODULE$.apply(
      blockManager.shuffleServerId(), partitionLengths, mapId);
  } catch (Exception e) {
    try {
      mapOutputWriter.abort(e);
    } catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
      e.addSuppressed(e2);
    }
    throw e;
  }
}

综上,Bypass的writer步骤有四步:

  • [1] 创建处理mapTask所有分区数据commit提交writer

  • [2] 为每个分区创建一个DiskBlockObjectWriter写入流和FileSegment文件段

    • [2.1] 每个分区创建个临时file和blockid, 并生成维护一个DiskBlockObjectWriter写入流
  • [3] 依次将records写入到对应分区的写入流中, 并提交

    • [3.1]依次对每个分区提交和flush写入流
  • [4] 遍历所有分区的FileSegement, 并将其链接为一个文件,同时会调用writeMetadataFileAndCommit,为其生成索引文件

所以说, Bypass在进行写入时会为每个MapTask都会生成reduce分区个FileSegement, 写入时会并发的为所有的分区都创建临时文件和维护一个io的写入流, 最终在链接为一个文件。所以如果分区数特别多的情况下,是会维护很多io流,所以Bypass限制了分区的阈值。另外通过源码发现Bypass在实现过程中并没有使用buffer, 而是直接将数据写入到流中,这也就是为什么Bypass不能处理mapSide的预聚合的算子。

那么BypassMergeSortShuffleWriter 属于sort-based Shuffle 到底有没有排序呢?

接下来,我们再看下Bypass是如何将分区的FileSegement, 并将其链接为一个文件, 我们就需要详细看下writePartitionedData是如何实现的。

private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
  // Track location of the partition starts in the output file
  if (partitionWriters != null) {
    final long writeStartTime = System.nanoTime();
    try {
      for (int i = 0; i < numPartitions; i++) {
        // [1] 获取每个分区的 fileSegement 临时文件,和writer写出流
        final File file = partitionWriterSegments[i].file();
        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
        if (file.exists()) {
          if (transferToEnabled) {
            // Using WritableByteChannelWrapper to make resource closing consistent between
            // this implementation and UnsafeShuffleWriter.
            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
            if (maybeOutputChannel.isPresent()) {
              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
            } else {
              writePartitionedDataWithStream(file, writer);
            }
          } else {
            // [2] 将fileSegement合并为一个文件
            writePartitionedDataWithStream(file, writer);
          }
          if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
          }
        }
      }
    } finally {
      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
    }
    partitionWriters = null;
  }
  // [3] 提交所有的分区,传入每个分区数据的长度, 调用 writeMetadataFileAndCommit生成索引文件,记录每个分区的偏移量
  return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
    .getPartitionLengths();
}

writePartitionedData是如何实现,有三个步骤:

  • [1] 获取每个分区的 fileSegement 临时文件,和writer写出流
  • [2] 将fileSegement合并为一个文件
  • [3] 提交所有的分区,传入每个分区数据的长度, 调用 writeMetadataFileAndCommit生成索引文件,记录每个分区的偏移量
bypass.png

总结, BypassMergeSortShuffleWriter 的实现是hash-style的方式,其中没有sort, 没有buffer,每一个mapTask都会生成分区数量个FileSegment, 最后再合并为一个File, 最终根据分区的长度为其生成索引文件。所以BypassMergeSortShuffleWriter在分区数量比较小的情况下,性能是比较佳的。其最终每个task会生成2个文件, 所以最终的生成文件数也是2 * M个文件。

今天就先到这里,通过上面的介绍,我们也留下些面试题:

  1. BypassMergeSortShuffleWriter和HashShuffle有什么区别?
  2. 为什么不保留HashShuffleManage, 而是将其作为SortShuffleManager中的一个Writer实现?

欢迎关注公众号 “Tim在路上”

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容