Spark Tungsten-sort shuffle write流程解析



Project Tungsten(“钨丝计划”)是DataBricks在3~4年前提出的Spark优化方案。由于Spark主要用Scala来开发,底层依赖JVM,因此不可避免地带来一些额外的性能开销(所谓overhead)。Tungsten致力于优化Spark的CPU与内存效率,主要有三方面:

  • 显式内存管理与基于二进制的处理:由Spark应用自己管理(序列化的)对象和内存,消除JVM对象模型和GC等带来的overhead;
  • 对缓存有感知的计算:提出高效的、能充分利用计算机存储体系的算法和数据结构;
  • 代码生成技术:充分利用最新的编译器和CPU的特性,提高运行效率。

关于它的详情,可以参看DataBricks的官方说明,以及Tungsten对应的Jira issue

目前,Tungsten在Spark SQL方面的应用最广泛。在其他方面,tungsten-sort shuffle就是比较重要的应用。从上面我们可以感觉到,tungsten-sort shuffle与前面的两种方式区别非常大,也会更难理解。下面来具体探索它的shuffle write细节,注释会写得尽量详细一点。

shuffle write入口

#1 - o.a.s.shuffle.sort.UnsafeShuffleWriter.write()方法


  public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    // Keep track of success so we know if we encountered an exception
    // We do this rather than a standard try/catch/re-throw to handle
    // generic throwables.
    boolean success = false;
    try {
      while (records.hasNext()) {
        //【#2 - 将shuffle数据插入排序器ShuffleExternalSorter进行处理】
      //【#9 - 合并与写输出文件】
      success = true;
    } finally {
      if (sorter != null) {
        try {
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);



#2 -与insertRecordIntoSorter()方法

  private void open() {
    assert (sorter == null);
    sorter = new ShuffleExternalSorter(
    //【DEFAULT_INITIAL_SER_BUFFER_SIZE常量值是1024 * 1024,即缓冲区初始1MB大】
    serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
    serOutputStream = serializer.serializeStream(serBuffer);

  void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    final int partitionId = partitioner.getPartition(key);
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);
    //【#3 - 将序列化之后的二进制数据插入ShuffleExternalSorter处理】
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);

这样一来,shuffle数据就已经序列化了,在shuffle write阶段结束前,都将在这些序列化的数据上操作,因此使用tungsten-sort的其中一条限制就是完全没有聚合操作。




#3 - o.a.s.shuffle.sort.ShuffleExternalSorter.insertRecord()方法

   * Write a record to the shuffle sorter.
  public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
    throws IOException {

    // for tests
    //【this.inMemSorter = new ShuffleInMemorySorter(this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));】
    assert(inMemSorter != null);
    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {"Spilling data because number of spilledRecords crossed the threshold " +
      //【#5 - 直接溢写磁盘】

    //【#4 - 检查是否需要对排序缓存扩容及溢写】
    // Need 4 bytes to store the record length.
    final int required = length + 4;

    assert(currentPage != null);
    final Object base = currentPage.getBaseObject();
    //【#6 - 将当前这条数据的逻辑内存地址(页号+偏移量)编码成一个长整型】
    final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
    //【先写长度值,并移动指针。Platform类中几乎所有方法都是直接调用unsafe API】
    Platform.putInt(base, pageCursor, length);
    pageCursor += 4;
    Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
    pageCursor += length;
    //【#7 - 将分区ID和编码的逻辑地址(“指针”)送给ShuffleInMemorySorter进行排序】
    inMemSorter.insertRecord(recordAddress, partitionId);

这段代码信息量大而晦涩,不过我们可以清楚地知道,在tungsten-sort机制中,也存在数据缓存和溢写,这与sort shuffle是类似的。但是,这里不再借助像PartitionedPairBuffer之类的高级数据结构,而是由程序自己完成,并且是直接操作内存空间。另外,真正完成排序工作的是最后出场的ShuffleInMemorySorter。



如果不了解或者忘记了逻辑地址、物理地址、页表、快表(TLB)等概念的话,可以参看操作系统方面的书籍,如Abraham Silberschatz所著《操作系统概念》(Operating System Concepts)。这也是我上大学时用的教材,非常好。


#4 - o.a.s.shuffle.sort.ShuffleExternalSorter.growPointerArrayIfNecessary()方法

   * Checks whether there is enough space to insert an additional record in to the sort pointer
   * array and grows the array if additional space is required. If the required space cannot be
   * obtained, then the in-memory data will be spilled to disk.
  private void growPointerArrayIfNecessary() throws IOException {
    assert(inMemSorter != null);
    if (!inMemSorter.hasSpaceForAnotherRecord()) {
      long used = inMemSorter.getMemoryUsage();
      LongArray array;
      try {
        // could trigger spilling
        array = allocateArray(used / 8 * 2);
      } catch (TooLargePageException e) {
        // The pointer array is too big to fix in a single page, spill.
        //【#5 - 如果分配内存超出了一页的限制,就直接溢写】
      } catch (SparkOutOfMemoryError e) {
        // should have trigger spilling
        if (!inMemSorter.hasSpaceForAnotherRecord()) {
          logger.error("Unable to grow the pointer array");
          throw e;
      // check if spilling is triggered or not
      if (inMemSorter.hasSpaceForAnotherRecord()) {
      } else {

与sort shuffle机制类似,在溢写之前仍然会先申请内存扩容,不过这里会受到页大小的限制。那么一页最大是多少呢?在PackedRecordPointer类中定义有常量:

static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;

static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;




#5 - o.a.s.shuffle.sort.ShuffleExternalSorter.spill()与writeSortedFile()方法

   * Sort and spill the current records in response to memory pressure.
  public long spill(long size, MemoryConsumer trigger) throws IOException {
    if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
      return 0L;
    }"Thread {} spilling sort data of {} to disk ({} {} so far)",
      spills.size() > 1 ? " times" : " time");
    final long spillSize = freeMemory();
    // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
    // records. Otherwise, if the task is over allocated memory, then without freeing the memory
    // pages, we might not be able to get memory for the pointer array.
    return spillSize;

   * Sorts the in-memory records and writes the sorted records to an on-disk file.
   * This method does not free the sort data structures.
   * @param isLastFile if true, this indicates that we're writing the final output file and that the
   *                   bytes written should be counted towards shuffle spill metrics rather than
   *                   shuffle write metrics.
  private void writeSortedFile(boolean isLastFile) {
    final ShuffleWriteMetrics writeMetricsToUse;

    if (isLastFile) {
      // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
      writeMetricsToUse = writeMetrics;
    } else {
      // We're spilling, so bytes written should be counted towards spill rather than write.
      // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
      // them towards shuffle bytes written.
      writeMetricsToUse = new ShuffleWriteMetrics();

    // This call performs the actual sort.
    //【#6 - 用inMemSorter排序,返回排序结果的"迭代器"(不是Java内部的迭代器,是单独实现的,像指针)】
    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =

    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
    // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
    // data through a byte array. This array does not need to be large enough to hold a single
    // record;
    final byte[] writeBuffer = new byte[diskWriteBufferSize];

    // Because this output will be read during shuffle, its compression codec must be controlled by
    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
    // createTempShuffleBlock here; see SPARK-3426 for more details.
    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    final File file = spilledFileInfo._2();
    final TempShuffleBlockId blockId = spilledFileInfo._1();
    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

    // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
    // Our write path doesn't actually use this serializer (since we end up calling the `write()`
    // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
    // around this, we pass a dummy no-op serializer.
    final SerializerInstance ser = DummySerializerInstance.INSTANCE;

    final DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);

    int currentPartition = -1;
    while (sortedRecords.hasNext()) {
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        currentPartition = partition;

      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
      long recordReadPosition = recordOffsetInPage + 4; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        writer.write(writeBuffer, 0, toTransfer);
        recordReadPosition += toTransfer;
        dataRemaining -= toTransfer;

    final FileSegment committedSegment = writer.commitAndGet();
    // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
    // then the file might be empty. Note that it might be better to avoid calling
    // writeSortedFile() in that case.
    if (currentPartition != -1) {
      spillInfo.partitionLengths[currentPartition] = committedSegment.length();

    if (!isLastFile) {  // i.e. this is a spill file

可见,在溢写之前仍然先对数据排序,并且一定会按分区号有序。每趟溢写产生的文件数与分区数相同,这点与基本sort shuffle不一样,sort shuffle的写批次是有参数控制的。




#6 - o.a.s.memory.TaskMemoryManager.encodePageNumberAndOffset()方法

  /** The number of bits used to address the page table. */
  private static final int PAGE_NUMBER_BITS = 13;

  /** The number of bits used to encode offsets in data pages. */
  static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;  // 51

   * Given a memory page and offset within that page, encode this address into a 64-bit long.
   * This address will remain valid as long as the corresponding page has not been freed.
   * @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
   * @param offsetInPage an offset in this page which incorporates the base offset. In other words,
   *                     this should be the value that you would pass as the base offset into an
   *                     UNSAFE call (e.g. page.baseOffset() + something).
   * @return an encoded page address.
  public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
    if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
      // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
      // encode. Due to our page size limitation, though, we can convert this into an offset that's
      // relative to the page's base offset; this relative offset will fit in 51 bits.
      offsetInPage -= page.getBaseOffset();
    return encodePageNumberAndOffset(page.pageNumber, offsetInPage);

  public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
    assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
    return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);


#7 - o.a.s.shuffle.sort.ShuffleInMemorySorter.insertRecord()与PackedRecordPointer.packPointer()方法

   * Inserts a record to be sorted.
   * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
   *                      certain pointer compression techniques used by the sorter, the sort can
   *                      only operate on pointers that point to locations in the first
   *                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
   * @param partitionId the partition id, which must be less than or equal to
   *                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
  public void insertRecord(long recordPointer, int partitionId) {
    if (!hasSpaceForAnotherRecord()) {
      throw new IllegalStateException("There is no space for new record");
    array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));

   * Pack a record address and partition id into a single word.
   * @param recordPointer a record pointer encoded by TaskMemoryManager.
   * @param partitionId a shuffle partition id (maximum value of 2^24).
   * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
  public static long packPointer(long recordPointer, int partitionId) {
    assert (partitionId <= MAXIMUM_PARTITION_ID);
    // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
    // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
    final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
    final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
    return (((long) partitionId) << 40) | compressedAddress;




#8 - o.a.s.shuffle.sort.ShuffleInMemorySorter.getSortedIterator()方法

   * Return an iterator over record pointers in sorted order.
  public ShuffleSorterIterator getSortedIterator() {
    int offset = 0;
    if (useRadixSort) {
      offset = RadixSort.sort(
        array, pos,
        PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
    } else {
      //【false就采用与sort shuffle相同的TimSort算法,在Sorter类中】
      MemoryBlock unused = new MemoryBlock(
        array.getBaseOffset() + pos * 8L,
        (array.size() - pos) * 8L);
      LongArray buffer = new LongArray(unused);
      Sorter<PackedRecordPointer, LongArray> sorter =
        new Sorter<>(new ShuffleSortDataFormat(buffer));
      sorter.sort(array, 0, pos, SORT_COMPARATOR);
    return new ShuffleSorterIterator(pos, array, offset);




#9 - o.a.s.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput()方法

  void closeAndWriteOutput() throws IOException {
    assert(sorter != null);
    serBuffer = null;
    serOutputStream = null;
    final SpillInfo[] spills = sorter.closeAndGetSpills();
    sorter = null;
    final long[] partitionLengths;
    final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    final File tmp = Utils.tempFileWith(output);
    try {
      try {
        //【#10 - 合并溢写文件到临时输出文件】
        partitionLengths = mergeSpills(spills, tmp);
      } finally {
        for (SpillInfo spill : spills) {
          if (spill.file.exists() && ! spill.file.delete()) {
            logger.error("Error while deleting spill file {}", spill.file.getPath());
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);

#10 - o.a.s.shuffle.sort.UnsafeShuffleWriter.mergeSpills()方法

   * Merge zero or more spill files together, choosing the fastest merging strategy based on the
   * number of spills and the IO compression codec.
   * @return the partition lengths in the merged file.
  private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
    final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
    final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
    final boolean fastMergeEnabled =
      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
    final boolean fastMergeIsSupported = !compressionEnabled ||
    final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
    try {
      if (spills.length == 0) {
        new FileOutputStream(outputFile).close(); // Create an empty file
        return new long[partitioner.numPartitions()];
      } else if (spills.length == 1) {
        // Here, we don't need to perform any metrics updates because the bytes written to this
        // output file would have already been counted as shuffle bytes written.
        Files.move(spills[0].file, outputFile);
        return spills[0].partitionLengths;
      } else {
        final long[] partitionLengths;
        if (fastMergeEnabled && fastMergeIsSupported) {
          // Compression is disabled or we are using an IO compression codec that supports
          // decompression of concatenated compressed streams, so we can perform a fast spill merge
          // that doesn't need to interpret the spilled bytes.
          //【就使用NIO zero-copy来合并到输出文件】
          if (transferToEnabled && !encryptionEnabled) {
            logger.debug("Using transferTo-based fast merge");
            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
          } else {
            //【如果不用transferTo的话,就使用不经压缩的BIO FileStream来合并到输出文件】
            logger.debug("Using fileStream-based fast merge");
            partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
        } else {
          //【如果不启用或不支持快速合并,就使用压缩的BIO FileStream来合并到输出文件。这个方式比较"慢"】
          logger.debug("Using slow merge");
          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
        // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
        // in-memory records, we write out the in-memory records to a file but do not count that
        // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
        // to be counted as shuffle write, but this will lead to double-counting of the final
        // SpillInfo's bytes.
        writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
        return partitionLengths;
    } catch (IOException e) {
      if (outputFile.exists() && !outputFile.delete()) {
        logger.error("Unable to delete output file {}", outputFile.getPath());
      throw e;

可见,tungsten-sort shuffle的文件合并方式与bypass sort shuffle的方式比较类似,有NIO和BIO两种方式,但判断条件更严格一些。至此,整个tungsten-sort shuffle write流程就结束了。



tungsten-sort shuffle write流程简图


  • Tungsten的内存管理机制详情
  • NIO在shuffle处理中的应用
