对应SerializedShuffleHandle
的情况,此时
- shuffle dependency没有指定aggregation或输出顺序
- 序列化器支持对已经序列化的对象重定位,重新排序序列化流输出中的序列化对象的字节等同于在序列化它们之前重新排序这些元素,这是为了直接对序列化的数据进行排序
- reduce端的分区数目小于等于16777216(排序过程中使用的是记录指针,其中分区ID占24位,则
MAXIMUM_PARTITION_ID = (1 << 24) - 1; //16777215
,又因为ID是从0开始的,所以分区数目不能大于16777216)
在这种模式下,数据记录被传入,先进行序列化,缓存,直接在序列化的二进制数据上排序而不是在java 对象上,这样可以减少内存的消耗和GC的开销,不过它需要串行化器的支持对已经序列化的数据重新排序,例如KryoSerializer
,排序过程由ShuffleExternalSorter
完成,它根据记录的分区进行排序,每条记录使用一个8字节的指针表示,它是记录的指针与patitionId的结合,根据排序后的结果,将指针对应的数据按顺序写出
优点:
- 可以对序列化的二进制数据进行排序,然后直接写出到文件
- 将键值和指针结合到一起进行排序,可以更好的利用缓存,这里使用了缓存感知计算技术
UnsafeShuffleWriter
最顶层的调用,用来写出数据:
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
//串行化record,然后插入ShuffleExternalSorter
insertRecordIntoSorter(records.next());
}
//关闭写出
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
//释放内存,删除数据溢出时写出的文件
sorter.cleanupResources();
} catch (Exception e) {
....
}
}
}
}
把(K,V)记录串行化的数据传递到ShuffleExternalSorter中
其中序列化输出流的输出缓存为serBuffer
,大小为spark.shuffle.spill.diskWriteBufferSize
,默认1MB
//用来缓存串行化数据,大小默认为1MB
private MyByteArrayOutputStream serBuffer;
private SerializationStream serOutputStream;
serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
serOutputStream = serializer.serializeStream(serBuffer);
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
//清空缓存
serBuffer.reset();
//写出序列化对象的流
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
// 写出到sorter中
sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
关闭并写出数据,这里用到IndexShuffleBlockResolver
,用来
创建并维护shuffle逻辑块和物理文件位置之间的映射,同一map任务的shuffle块数据存储在单个合并的文件中,数据文件中数据块的偏移信息存储在索引文件中。文件名称为shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
,其中reduceId为0,“.data”作为数据文件的文件名后缀,“.index”作为索引文件的文件名后缀
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
//关闭sorter,所有的缓冲数据进行排序并写入磁盘,返回所有相关文件的信息
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
//通过shuffleId,与map端该分区的partitionId创建一个名称为"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"的文件
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
//创建一个临时文件,数据先合并到该文件,最后修改名称为output文件的名称
final File tmp = Utils.tempFileWith(output);
try {
try {
//把spill文件合并到一起,根据不同情况,选择合适的合并方式
partitionLengths = mergeSpills(spills, tmp);
} finally {
//删除数据溢出的文件
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
//写出"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"索引文件,记录的是每个Partition数据的偏移
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
//ShuffleMapTask返回给Scheduler的状态结果,包括一个BlockMangeer的ID,以及对应每个reducer的的输出数据大小
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
合并文件就是把SpillInfo[] spills
中的文件,相同分区的都合并到一起,最终返回一个完整的文件,根据压缩,加密的需求有不同的合并方式,最后要返回各个partition的长度,默认配置情况下会使用lz4,不加密,直接通过NIO的transferTo机制合并
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
//使用压缩,压缩方式由"spark.io.compression.codec"指定,默认是"lz4"
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled =
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
// 支持快速合并 不使用压缩或者codec支持对级联序列化流进行解压缩
final boolean fastMergeIsSupported = !compressionEnabled ||
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
//加密 "spark.io.encryption.enabled",默认为false
final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
try {
if (spills.length == 0) {
// 创建一个空文件
new FileOutputStream(outputFile).close();
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// 只有一个文件,中间没有进行数据溢出,直接把文件移动到该路径下
Files.move(spills[0].file, outputFile);
return spills[0].partitionLengths;
} else {
final long[] partitionLengths;
// 快速合并 不使用压缩或者codec支持对级联序列化流进行解压缩
if (fastMergeEnabled && fastMergeIsSupported) {
// 没有加密
if (transferToEnabled && !encryptionEnabled) {
logger.debug("Using transferTo-based fast merge");
partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
} else {
logger.debug("Using fileStream-based fast merge");
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
}
} else {
//慢合并
logger.debug("Using slow merge");
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
}
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
...
}
}
快速合并:对应mergeSpillsWithTransferTo
方法,基于Java NIO,通过channel直接传输数据,内部通过两层循环遍历每个文件的每个分区,将分区相同的数据合并到一起,要求数据不能是压缩的,或者支持级联压缩流的解压缩
慢合并:对应mergeSpillsWithFileStream
,使用Java标准的流式IO,它主要用于IO压缩的编解码器不支持级联压缩数据,加密被启动或用户已显示禁止使用transferTo
的情况,其中设计对数据流进行加密,压缩等。通常它的速度要比快速合并慢,但是如果spill中单个分区的数据量很小,此时mergeSpillsWithTransferTo
方法执行许多小的磁盘IO,效率低下,该方法可能更快。因为它使用大缓冲区缓存输入和输出文件,有助于减少磁盘io的数量,使文件合并速度更快,其中
-
inputBufferSizeInBytes
shufffle文件合并时spill输出流的缓存 "spark.shuffle.file.buffer" 默认32KB -
outputBufferSizeInBytes
shuffle合并的文件对应的输出缓存 "spark.shuffle.unsafe.file.output.buffer" 默认 32KB
ShuffleExternalSorter
首先是将数据缓存在内存中,当所有的数据记录都输入进来或者缓存达到阈值,对内存中的记录进行排序,这是通过ShuffleInMemorySorter
完成的
ShuffleExternalSorter
继承了MemoryConsumer
,可以通过TaskMemoryManager
申请释放内存,当内存不足时,支持写出部分数据到磁盘,来减少内存消耗
MemoryManager
中内存分配按页进行分配,分为in-heap/off-heap两种,即堆内/堆外内存,默认不启用堆外内存,堆内模式下,内存地址通过base object的引用加上一个64位的偏移量决定,堆外内存可以避免GC的开销,直接控制内存的申请和释放
ShuffleExternalSorter(
TaskMemoryManager memoryManager,
BlockManager blockManager,
TaskContext taskContext,
int initialSize,
int numPartitions,
SparkConf conf,
ShuffleWriteMetrics writeMetrics) {
//PackedRecordPointer中的页内偏移为27位,决定了内存页面的最大值128MB,同时根据TaskMemoryManager内存页面的最大值,限定每次能申请的页面最大值
super(memoryManager,
(int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
memoryManager.getTungstenMemoryMode());
this.taskMemoryManager = memoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
this.numPartitions = numPartitions;
// shuffle文件输出流的缓存,spark.shuffle.file.buffer,默认32KB
this.fileBufferSizeBytes =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
//强制sorter进行排序写出的元素数目,Integer.MAX_VALUE
this.numElementsForSpillThreshold =
(int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
this.writeMetrics = writeMetrics;
//对记录进行排序的缓存,存储的是记录指针PackedRecordPointer,“spark.shuffle.sort.initialBufferSize” 默认大小4096B,不够可以继续申请翻倍的空间,或者对记录数据进行写出
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
//排序使用的内存量,单位 bytes,存储序列化记录的所有内存页面,加上记录指针排序的内存页面的总大小
this.peakMemoryUsedBytes = getMemoryUsage();
//排序过的数据进行写出的缓存大小,"spark.shuffle.spill.diskWriteBufferSize",默认 1MB
this.diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
}
存储相关的数据结构
@Nullable private LinkedList<MemoryBlock> allocatedPages; //所有记录压缩数据的页面链表
@Nullable private MemoryBlock currentPage = null; //用来记录record的当前页面
private long pageCursor = -1; //当前页面的偏移指针,用来记录下一条记录
添加记录到shuffle sorter,partitionId指的是reducer端的分区ID
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
throws IOException {
assert(inMemSorter != null);
//确定记录数目没有达到最大值,默认值为Integer.MAX_VALUE
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
}
//检查inMemSorter中是否有足够的空间来写入下一条记录指针
//如果不够,申请扩大内存一倍
//如果内存页面过大或者没有内存,就写出数据,调用spill函数
growPointerArrayIfNecessary();
// 需要额外四或八个字节存储串行化记录的长度
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
//如果currentPage内存不足,申请分配一个新的页面page
acquireNewPageIfNecessary(required);
assert(currentPage != null);
final Object base = currentPage.getBaseObject();
//记录的地址:高13位为页码pageNumber,后51位为偏移量offsetInPage
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
//在currentPage中写入记录的长度和序列化后的字节序列
UnsafeAlignedOffset.putSize(base, pageCursor, length);
pageCursor += uaoSize;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
//将该条记录转换为记录指针,传递到ShuffleInMemorySorter中
inMemSorter.insertRecord(recordAddress, partitionId);
}
当内存不足时,写出当前已经记录的数据,来缓解内存压力
public long spill(long size, MemoryConsumer trigger) throws IOException {
...
//写出文件
writeSortedFile(false);
//释放内存,并做记录
final long spillSize = freeMemory();
inMemSorter.reset();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
实际进行数据写出的函数逻辑:
将内存内的记录进行排序,然后写出到一个文件,一个文件对应一个SpillInfo
,内部记录了每个partition对应的数据长度,最后将该SpillInfo信息添加到链表中
//进行数据写出,每个SpillInfo对应一个写出的文件
private final LinkedList<SpillInfo> spills = new LinkedList<>();
private void writeSortedFile(boolean isLastFile) {
//选择合适的ShuffleWriteMetrics,度量shuffle写出的数据
final ShuffleWriteMetrics writeMetricsToUse;
if (isLastFile) {
writeMetricsToUse = writeMetrics;
} else {
writeMetricsToUse = new ShuffleWriteMetrics();
}
// ShuffleInMemorySorter排序,返回一个迭代器获取排序过的记录指针
final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
inMemSorter.getSortedIterator();
//1MB 缓存,没有必要完整存储一条记录,所以不用过大
final byte[] writeBuffer = new byte[diskWriteBufferSize];
//通过blockManager创建一个名为"temp_shuffle_ + block_id "的文件,存储shuffle中间结果
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = spilledFileInfo._2();
final TempShuffleBlockId blockId = spilledFileInfo._1();
//记录由ShuffleExternalSorter写出的数据块的相关信息,其中有一个numPartitions长度的数组,每个index对应该PartitionID的FileSegment的长度
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
final SerializerInstance ser = DummySerializerInstance.INSTANCE;
//用于写出数据到磁盘
final DiskBlockObjectWriter writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
int currentPartition = -1;
while (sortedRecords.hasNext()) {
//读取每个记录指针
sortedRecords.loadNext();
final int partition = sortedRecords.packedRecordPointer.getPartitionId();
assert (partition >= currentPartition);
if (partition != currentPartition) {
// 每个patition对应文件中的一个Segment,当出现新的分区id的时候,记录上一个分区id对应的Segment的数据长度
if (currentPartition != -1) {
final FileSegment fileSegment = writer.commitAndGet();
spillInfo.partitionLengths[currentPartition] = fileSegment.length();
}
currentPartition = partition;
}
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
//前四个字节,记录的长度
int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length
while (dataRemaining > 0) {
//从recordPage中最多复制diskWriteBufferSize大小(1MB)的数据,写出
final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
Platform.copyMemory(
recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
writer.write(writeBuffer, 0, toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
}
//更新记录一条数据已经写出
writer.recordWritten();
}
final FileSegment committedSegment = writer.commitAndGet();
writer.close();
//记录最后一个分区对应的数据段长度,同时记录该写出的文件到spills中
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = committedSegment.length();
spills.add(spillInfo);
}
// 度量数据记录
if (!isLastFile) {
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}
ShuffleInMemorySorter
对记录在内存上进行排序,其内部使用LongArray
存储每一条数据记录的指针,数据记录全部输入后或者内存达到限定时对数据进行排序。LongArray
持有一块内存,它是通过ShuffleExternalSorter
申请来的一个内存块MemoryBlock
/**
* consumer 就是持有该对象的ShuffleExternalSorter
* initialSize 内存块的大小,用来存储数据记录指针,最后用来排序,对应属性为 `spark.shuffle.sort.initialBufferSize` 初始默认大小4096B
* useRadixSort 是否使用基数排序,通过 `spark.shuffle.sort.useRadixSort` 指定,默认使用,此时内存块至少要留一半的空间,用来排序
*/
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
this.consumer = consumer;
assert (initialSize > 0);
this.initialSize = initialSize;
this.useRadixSort = useRadixSort;
this.array = consumer.allocateArray(initialSize);
this.usableCapacity = getUsableCapacity();
}
不同排序方式需要的额外内存不同
private int getUsableCapacity() {
// Radix sort requires same amount of used memory as buffer, Tim sort requires
// half of the used memory as buffer.
return (int) (array.size() / (useRadixSort ? 2 : 1.5));
}
LongArray array
数组内部存储的是记录指针PackedRecordPointer
,大小为8字节,字节含义如下:
[24 bit partition number][13 bit memory page number][27 bit offset in page]
所以在没有字节对齐的情况下,最大的Page大小为2^27 bits = 128 MB,对应总大小 2^13 * 128 MB = 1 TB,所以每个task最大定位1TB大小的内存,将来可能会进行8字节对齐的优化,此时Page最大为1GB,不过目前没有进行字节对齐。同时指针内部只含有PartitionId,所以内部的归并排序只比较了PartitionId的大小,没有比较Key值的大小