[SPARK][CORE] 面试问题之UnsafeShuffleWriter流程解析(下)

欢迎关注微信公众号“Tim在路上”
Unsafe Shuffle的实现在一定程度上是Tungsten内存管理优化的的主要应用场景。其实现过程实际上和SortShuffleWriter是类似的,但是其中维护和执行的数据结构是不一样的。

UnsafeShuffleWriter 源码解析

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
  // Keep track of success so we know if we encountered an exception
  // We do this rather than a standard try/catch/re-throw to handle
  // generic throwables.
  // [1] 使用success记录write是否成功,判断是write阶段的异常还是clean阶段
  boolean success = false;
  try {
    // [2] 遍历所有的数据插入ShuffleExternalSorter
    while (records.hasNext()) {
      insertRecordIntoSorter(records.next());
    }
    // [3] close排序器使所有数据写出到磁盘,并将多个溢写文件合并到一起
    closeAndWriteOutput();
    success = true;
  } finally {
    if (sorter != null) {
      try {
        // [4] 清除并释放资源
        sorter.cleanupResources();
      } catch (Exception e) {
        // Only throw this error if we won't be masking another
        // error.
        if (success) {
          throw e;
        } else {
logger.error("In addition to a failure during writing, we failed during " +
                       "cleanup.", e);
        }
      }
    }
  }
}

从上面的代码可以看出,UnsafeShuffleWriter的write过程如下:

  • [1] 使用success记录write是否成功,判断是write阶段的异常还是clean阶段
  • [2] 遍历所有的数据插入ShuffleExternalSorter
  • [3] close排序器使所有数据写出到磁盘,并将多个溢写文件合并到一起
  • [4] 清除并释放资源
// open()方法是在初始化UnsafeShuffleWriter调用的,其中会创建sorter, 并创建一个字节输出流,同时封装序列化流
private void open() throws SparkException {
  assert (sorter == null);
  sorter = new ShuffleExternalSorter(
    memoryManager,
    blockManager,
    taskContext,
    initialSortBufferSize,
    partitioner.numPartitions(),
    sparkConf,
    writeMetrics);
    // MyByteArrayOutputStream类是ByteArrayOutputStream的简单封装,只是将内部byte[]数组暴露出来】
    //【DEFAULT_INITIAL_SER_BUFFER_SIZE常量值是1024 * 1024,即缓冲区初始1MB大】
  serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
  serOutputStream = serializer.serializeStream(serBuffer);
}

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    // [1] 获取record的key和partitionId
    final K key = record._1();
    final int partitionId = partitioner.getPartition(key);
    // [2] 将record序列化为二进制,并写的字节数组输出流serBuffer中
    serBuffer.reset();
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);
    // [3] 将其插入到ShuffleExternalSorter中
    sorter.insertRecord(
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
  }

这一步是将record插入前的准备,现将序列化为二进制存储在内存中。

  • [1] 获取record的key和partitionId
  • [2] 将record序列化为二进制,并写的字节数组输出流serBuffer中
  • [3] 将序列化的二进制数组,分区id, length 作为参数插入到ShuffleExternalSorter中

那么数据在ShuffleExternalSorter中写入过程是怎么样呢?

public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
  throws IOException {

  // [1] 判断inMemSorter中的记录是否到达了溢写阈值(默认是整数最大值),如果满足就先进行spill
  // for tests
  assert(inMemSorter != null);
  if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
      numElementsForSpillThreshold);
    spill();
  }
  // [2] 检查inMemSorter是否有额外的空间插入,如果可以获取就扩充空间,否则进行溢写
  growPointerArrayIfNecessary();
  final int uaoSize = UnsafeAlignedOffset.getUaoSize();
  // Need 4 or 8 bytes to store the record length.
  final int required = length + uaoSize;
  // [3] 判断当前内存空间currentPage是否有足够的内存,如果不够就申请,申请不下来就需要spill
  acquireNewPageIfNecessary(required);

  assert(currentPage != null);
  // [4] 获取currentPage的base Object和recordAddress
  final Object base = currentPage.getBaseObject();
  final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
  // [5] 根据base, pageCursor, 先向当前内存空间写长度值,并移动指针
  UnsafeAlignedOffset.putSize(base, pageCursor, length);
  pageCursor += uaoSize;
  // [6] 再写序列化之后的数据, 并移动指指
  Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
  pageCursor += length;
  // [7] 将recordAddress和partitionId插入inMemSorter进行排序
  inMemSorter.insertRecord(recordAddress, partitionId);
}

从上面分析,数据插入ShuffleExternalSorter总共需要7步:

  • [1] 判断inMemSorter中的记录是否到达了溢写阈值(默认是整数最大值),如果满足就先进行spill
  • [2] 检查inMemSorter是否有额外的空间插入,如果可以获取就扩充空间,否则进行溢写
  • [3] 判断当前内存空间currentPage是否有足够的内存,如果不够就申请,申请不下来就需要spill
  • [4] 获取currentPage的base Object和recordAddress
  • [5] 先向当前内存空间写长度值,并移动指针
  • [6] 再写序列化之后的数据, 并移动指指
  • [7] 将recordAddress和partitionId插入inMemSorter进行排序

从上面的介绍可以看出在整个插入过程中,主要涉及ShuffleExternalSorterShuffleInMemorySorter 两个数据结构。我们来简单看了ShuffleExternalSorter 类。

final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleChecksumSupport {

  private final int numPartitions;
  private final TaskMemoryManager taskMemoryManager;
  private final BlockManager blockManager;
  private final TaskContext taskContext;
  private final ShuffleWriteMetricsReporter writeMetrics;
  private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();

  private final LinkedList<SpillInfo> spills = new LinkedList<>();

  /** Peak memory used by this sorter so far, in bytes. **/
  private long peakMemoryUsedBytes;

  // These variables are reset after spilling:
  @Nullable private ShuffleInMemorySorter inMemSorter;
  @Nullable private MemoryBlock currentPage = null;
  private long pageCursor = -1;
  ...
}

可见每个ShuffleExternalSorter 中封装着ShuffleInMemorySorter类。同时封装allocatedPages

、spills和currentPage。也就是说ShuffleExternalSorter使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair。

另外在 ShuffleInMemorySorter 中,通过LongArray 来存储数据,并实现了SortComparator

排序方法。其中LongArray 存储的record的位置信息,主要有分区id, page id 和offset。

ShuffleExternalSorter 使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair
ShuffleInMemorySorter 使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes
d.png

从上面的关于ShuffleExternalSorterShuffleInMemorySorter 可以看出,这里其实质上是使用Tungsten实现了类似于BytesToBytesMap的数据结构,不过将其数组部分LongArray用ShuffleInMemorySorter 进行了封装,其余拆分为ShuffleExternalSorter

ShuffleExternalSorter 将数据写入了当前的内存空间,将数据的recordAddress和partitionId写入了ShuffleInMemorySorter ,那么其具体是如何实现排序和数据的溢写的?

private void writeSortedFile(boolean isLastFile) {

  // [1] 将inMemSorter的数据排序,并返回ShuffleSorterIterator
  // This call performs the actual sort.
  final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
    inMemSorter.getSortedIterator();

  // If there are no sorted records, so we don't need to create an empty spill file.
  if (!sortedRecords.hasNext()) {
    return;
  }

  final ShuffleWriteMetricsReporter writeMetricsToUse;

  ...

  // [2] 创建缓存数据writeBuffer数组,为了避免DiskBlockObjectWriter的低效的写
  // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
  // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
  // data through a byte array. This array does not need to be large enough to hold a single
  // record;
  final byte[] writeBuffer = new byte[diskWriteBufferSize];

  // Because this output will be read during shuffle, its compression codec must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more details.
  final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    blockManager.diskBlockManager().createTempShuffleBlock();
  final File file = spilledFileInfo._2();
  final TempShuffleBlockId blockId = spilledFileInfo._1();
  final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

  // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
  // Our write path doesn't actually use this serializer (since we end up calling the `write()`
  // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
  // around this, we pass a dummy no-op serializer.
  final SerializerInstance ser = DummySerializerInstance.INSTANCE;

  int currentPartition = -1;
  final FileSegment committedSegment;
  try (DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {

    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    // [3] 按分区遍历已经排好序的指针数据, 并未每个分区提交一个FileSegment,并记录分区的大小
    while (sortedRecords.hasNext()) {
      sortedRecords.loadNext();
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        currentPartition = partition;
        if (partitionChecksums.length > 0) {
          writer.setChecksum(partitionChecksums[currentPartition]);
        }
      }
      // [4] 取得数据的指针,再通过指针取得页号与偏移量
      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      // [5] 取得数据前面存储的长度,然后让指针跳过它
      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
     // [6] 数据拷贝到上面创建的缓存中,通过缓存转到DiskBlockObjectWriter, 并写入数据,移动指针
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        writer.write(writeBuffer, 0, toTransfer);
        recordReadPosition += toTransfer;
        dataRemaining -= toTransfer;
      }
      writer.recordWritten();
    }

    committedSegment = writer.commitAndGet();
  }
  // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
  // then the file might be empty. Note that it might be better to avoid calling
  // writeSortedFile() in that case.
  if (currentPartition != -1) {
    spillInfo.partitionLengths[currentPartition] = committedSegment.length();
    spills.add(spillInfo);
  }

  if (!isLastFile) {  // i.e. this is a spill file
    writeMetrics.incRecordsWritten(
      ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
    taskContext.taskMetrics().incDiskBytesSpilled(
      ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
  }
}

溢写排序文件总的来说分为两步:

首先是通过ShuffleInMemorySorter排序,获取对应分区的FileSegment和长度。写文件或溢写前根据数据的PartitionId信息,使用TimSort对ShuffleInMemorySorter的long数组排序,排序的结果为,PartitionId相同的聚集在一起,且PartitionId较小的排在前面,然后按分区写出FileSegment, 并记录每个分区的长度。

Unled.png

其次是基于排好序的指针执行数据的溢写操作。依次读取ShuffleInMemorySorter中long数组的元素,再根据page number和offset信息去ShuffleExternalSorter中读取K-V Pair写入文件, 溢写前先写入writeBuffer,然后在写入DiskBlockObjectWriter中。


itled.png

具体的步骤见下:

  • [1] 将inMemSorter的数据排序,并返回ShuffleSorterIterator
  • [2] 创建缓存数据writeBuffer数组,为了避免DiskBlockObjectWriter的低效的写
  • [3] 按分区遍历已经排好序的指针数据, 并未每个分区提交一个FileSegment,并记录分区的大小
  • [4] 取得数据的指针,再通过指针取得页号与偏移量
  • [5] 取得数据前面存储的长度,然后让指针跳过它
  • [6] 数据拷贝到上面创建的缓存writeBuffer中,通过缓存转到DiskBlockObjectWriter, 并写入数据,移动指针

最后我们看下,UnsafeShuffleWriter是如何将最后溢写的文件进行合并的?

// UnsafeShuffleWriter
void closeAndWriteOutput() throws IOException {
  assert(sorter != null);
  updatePeakMemoryUsed();
  serBuffer = null;
  serOutputStream = null;
  // [1] 关闭排序器,并将排序器中的数据全部溢写到磁盘,返回SpillInfo数组
  final SpillInfo[] spills = sorter.closeAndGetSpills();
  try {
    // [2] 将多个溢出文件合并在一起,根据溢出次数和 IO 压缩编解码器选择最快的合并策略
    partitionLengths = mergeSpills(spills);
  } finally {
    sorter = null;
    for (SpillInfo spill : spills) {
      if (spill.file.exists() && !spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
      }
    }
  }
  mapStatus = MapStatus$.MODULE$.apply(
    blockManager.shuffleServerId(), partitionLengths, mapId);
}

private long[] mergeSpills(SpillInfo[] spills) throws IOException {
    long[] partitionLengths;
    // [1] 如果根本没有溢写文件,写一个空文件
    if (spills.length == 0) {
      final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
          .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
      return mapWriter.commitAllPartitions(
        ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
    // [2] 如果只有一个溢写文件,就直接将它写入输出文件中
    } else if (spills.length == 1) {
      // [2.1] 创建单个file的map output writer
      Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
          shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
      if (maybeSingleFileWriter.isPresent()) {
        // Here, we don't need to perform any metrics updates because the bytes written to this
        // output file would have already been counted as shuffle bytes written.
        partitionLengths = spills[0].partitionLengths;
        logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
            partitionLengths.length);
        maybeSingleFileWriter.get()
          .transferMapSpillFile(spills[0].file, partitionLengths, sorter.getChecksums());
      } else {
        partitionLengths = mergeSpillsUsingStandardWriter(spills);
      }
    // [3] 如果有多个溢写文件,如果启用并支持快速合并,并且启用了transferTo机制,还没有加密,        就使用NIO zero-copy来合并到输出文件, 不启用transferTo或不支持快速合并,就使用压缩的BIO FileStream来合并到输出文件
    } else {
      partitionLengths = mergeSpillsUsingStandardWriter(spills);
    }
    return partitionLengths;
  }

多个spills的合并的具体的实现在mergeSpillsWithFileStream 方法中,为了减少篇幅的冗长这里就不再展开了。

溢写的文件进行合并,有如下几个步骤:

  • [1] 关闭排序器,并将排序器中的数据全部溢写到磁盘,返回SpillInfo数组

  • [2] 将多个溢出文件合并在一起,根据溢出次数和 IO 压缩编解码器选择最快的合并策略

     - [2.1] 如果根本没有溢写文件,写一个空文件
    
     - [2.2] 如果只有一个溢写文件,就直接将它写入输出文件中
    
     - [2.3] 如果有多个溢写文件,如果启用并支持快速合并,并且启用了transferTo机制,还没有加密,        就使用NIO zero-copy来合并到输出文件, 不启用transferTo或不支持快速合并,就使用压缩的BIO FileStream来合并到输出文件
    
    

至此,UnsafeShuffleWriter的实现就介绍完了。

下面我们谈下UnsafeShuffleWriter的优势:

  • ShuffleExternalSorter使用UnSafe API操作序列化数据,而不是Java对象,减少了内存占用及因此导致的GC耗时,这个优化需要Serializer支持relocation。 ShuffleExternalSorter存原始数据,ShuffleInMemorySorter使用压缩指针存储元数据,每条记录仅占8 bytes,并且排序时不需要处理原始数据,效率高。
  • 溢写 & 合并这一步操作的是同一Partition的数据,因为使用UnSafe API直接操作序列化数据,合并时不需要反序列化数据。
  • 溢写 & 合并可以使用fastMerge提升效率(调用NIO的transferTo方法),设置spark.shuffle.unsafe.fastMergeEnabled为true,并且如果使用了压缩,需要压缩算法支持SerializedStreams的连接。
  • 排序时并非将数据进行排序,而是将数据的地址指针进行排序

总结,UnsafeShuffleWriter是Tungsten最重要的应用,他的实现原理类似于SortShuffleWriter, 但是基于UnSafe API使用了定义的ShuffleExternalSorter和ShuffleInMemorySorter来存储和维护数据。

其整体流程为,所有的数据在插入前都需要序列化为二进制数组,然后再将其插入到数据结构ShuffleExternalSorter中。在ShuffleExternalSorter定义了ShuffleInMemorySorter主要用于存储数据的partitionId和recordAddress, 另外定义了MemoryBlock页空间数组

在ShuffleExternalSorter的insertRecord时会先,判断ShuffleInMemorySorter和当前内存空间是否足够新数据的插入,不够需要申请,申请失败则需要spill

插入数据时会先写入占用内存空间的长度,再写入数据值,最后将recordAddress和partitionId插入ShuffleInMemorySorter中。在进行spill时会将ShuffleInMemorySorter中的数据进行排序,并按照分区生成FileSegment并统计分区的大小,然后遍历指针数组根据地址将对应的数据进行写出。在进行合并时可以直接使用UnSafe API直接操作序列化数据,返回汇总的文件。

通过UnsafeShuffleWriter只会产生两个文件,一个分区的数据文件,一个索引文件。整个UnsafeShuffleWriter过程只会产生2 * M 个中间文件。

今天就先到这里,通过上面的介绍,我们也留下些面试题:

  1. 为什么UnsafeShuffleWriter无法支持无法支持map端的aggregation?
  2. 为什么UnsafeShuffleWriter分区数的最大值为 (1 << 24) ?
  3. ShuffleExternalSorter实现是基于JVM的吗?以及其在排序上有什么优化?

欢迎关注微信公众号“Tim在路上”

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349