1、shufflemanager的实现类:sortshufflemanager
Spark 0.8及以前 Hash Based Shuffle
在Shuffle Write过程按照Hash的方式重组Partition的数据,不进行排序。每个map端的任务为每个reduce端的Task生成一个文件,通常会产生大量的文件(即对应为M*R个中间文件,其中M表示map端的Task个数,R表示reduce端的Task个数),伴随大量的随机磁盘IO操作与大量的内存开销。
Shuffle Read过程如果有combiner操作,那么它会把拉到的数据保存在一个Spark封装的哈希表(AppendOnlyMap)中进行合并。在代码结构上:
- org.apache.spark.storage.ShuffleBlockManager负责Shuffle Write
- org.apache.spark.BlockStoreShuffleFetcher负责Shuffle Read
- org.apache.spark.Aggregator负责combine,依赖于AppendOnlyMap
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
通过文件合并,中间文件的生成方式修改为每个执行单位(一个Executor中的执行单位等于Core的个数除以每个Task所需的Core数)为每个reduce端的任务生成一个文件。最终可以将文件个数从MR修改为EC/T*R,其中,E表示Executor的个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数。是否采用Consolidate机制,需要配置spark.shuffle.consolidateFiles参数
Spark 0.9 引入ExternalAppendOnlyMap
在combine的时候,可以将数据spill到磁盘,然后通过堆排序merge
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。
在代码结构上:
从以前的ShuffleBlockManager中分离出ShuffleManager来专门管理Shuffle Writer和Shuffle Reader。两种Shuffle方式分别对应
org.apache.spark.shuffle.hash.HashShuffleManager和
org.apache.spark.shuffle.sort.SortShuffleManager,
可通过spark.shuffle.manager参数配置。两种Shuffle方式有各自的ShuffleWriter:org.apache.spark.shuffle.hash.HashShuffle和org.apache.spark.shuffle.sort.SortShuffleWriter;但共用一个ShuffleReader,即org.apache.spark.shuffle.hash.HashShuffleReader。
org.apache.spark.util.collection.ExternalSorter实现排序功能。可通过对spark.shuffle.spill参数配置,决定是否可以在排序时将临时数据Spill到磁盘。
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle。
在代码层面:
- 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
- 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java实现)
- ShuffleReader复用HashShuffleReader
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle。
在代码方面:
- UnsafeShuffleManager合并到SortShuffleManager
- HashShuffleReader 重命名为BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader。
Spark 2.0 Hash Based Shuffle退出历史舞台,从此Spark只有Sort Based Shuffle,ShuffleManager的实现类就只有SortShufflemanager
2、sortshufflemanager.registerShuffle
3、sortshufflemanager.getReader
4、sortshufflemanager.getWriter
[html] view plaincopy
<embed id="ZeroClipboardMovie_1" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_1" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=1&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- case unsafeShuffleHandle: SerializedShuffleHandle] =>
- new UnsafeShuffleWriter(.......)
- case bypassMergeSortHandle: BypassMergeSortShuffleHandle=>
- new BypassMergeSortShuffleWriter(......)
- case other: BaseShuffleHandle =>
- new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
5、BypassMergeSortShuffleWriter类似于hash shuffle,但是将output file合并成一个文件
1)、BypassMergeSortShuffleWriter.write
传参:partition的itearator
【如果record为空】
[html] view plaincopy
<embed id="ZeroClipboardMovie_2" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_2" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=2&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- if (!records.hasNext()) {
- partitionLengths = new long[numPartitions];
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
- mapStatus = MapStatus.apply(blockManager.shuffleServerId(), partitionLengths);
- return;
- }
【获取partition写入磁盘文件的writer】
[html] view plaincopy
<embed id="ZeroClipboardMovie_3" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_3" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=3&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- partitionWriters = new DiskBlockObjectWriter[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
- blockManager.diskBlockManager().createTempShuffleBlock();
- final File file = tempShuffleBlockIdPlusFile._2();
- final BlockId blockId = tempShuffleBlockIdPlusFile._1();
- partitionWriters[i] =
- blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
- }
【写文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_4" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_4" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=4&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- while (records.hasNext()) {
- final Product2<K, V> record = records.next();
- final K key = record._1();
- partitionWriters[partitioner.getPartition(key)].write(key, record._2());
- }
【获取每个ShuffleBlock,ShuffleBlock被称为FileSegment】
[html] view plaincopy
<embed id="ZeroClipboardMovie_5" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_5" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=5&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- partitionWriterSegments = new FileSegment[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- final DiskBlockObjectWriter writer = partitionWriters[i];
- partitionWriterSegments[i] = writer.commitAndGet();
- writer.close();
- }
【合并文件以及写index文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_6" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_6" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=6&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
- File tmp = Utils.tempFileWith(output);
- try {
- partitionLengths = writePartitionedFile(tmp);
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
- }
- }
2)、BypassMergeSortShuffleWriter.writePartitionedFile
传参:合并文件 File outputFile
[html] view plaincopy
<embed id="ZeroClipboardMovie_7" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_7" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=7&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- final FileOutputStream out = new FileOutputStream(outputFile, true);
- for (int i = 0; i < numPartitions; i++) {
- final File file = partitionWriterSegments[i].file();
- if (file.exists()) {
- final FileInputStream in = new FileInputStream(file);
- boolean copyThrewException = true;
- try {
- lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
- copyThrewException = false;
- } finally {
- Closeables.close(in, copyThrewException);
- }
- if (!file.delete()) {
- logger.error("Unable to delete file for partition {}", i);
- }
- }
- }
- threwException = false;
返回文件的偏移量
6、SortShuffleWriter
1)、SortShuffleWriter.writer
【对rdd进行排序】
[html] view plaincopy
<embed id="ZeroClipboardMovie_8" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_8" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=8&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- 如果map端进行combine则,反之则不关心key在每个partition中是否被排序,既不传递aggregator也不传递ordering
- sorter = if (dep.mapSideCombine) {
- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
- //需要combine时,传递partitioner以及ordering
- new ExternalSorter[K, V, C](
- context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
- } else {
- new ExternalSorter[K, V, V](
- context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
- }
- //将数据存储在buffer或者map中,这是最关键的地方,根据需求(包括partition内的key排序,partitionID排序等等)排序,内存不够时数据会spill后写入spillfile
- sorter.insertAll(records)
【写文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_9" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_9" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=9&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- //blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
- //通过工具类创建临时文件
- val tmp = Utils.tempFileWith(output)
- try {
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- //将buffer或者map中的数据写入文件,各个partition
- val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
- //写index文件
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
- mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
- }
- }