spark源码阅读之shuffleManager

1、shufflemanager的实现类:sortshufflemanager

Spark 0.8及以前 Hash Based Shuffle

在Shuffle Write过程按照Hash的方式重组Partition的数据,不进行排序。每个map端的任务为每个reduce端的Task生成一个文件,通常会产生大量的文件(即对应为M*R个中间文件,其中M表示map端的Task个数,R表示reduce端的Task个数),伴随大量的随机磁盘IO操作与大量的内存开销。

Shuffle Read过程如果有combiner操作,那么它会把拉到的数据保存在一个Spark封装的哈希表(AppendOnlyMap)中进行合并。在代码结构上:

  • org.apache.spark.storage.ShuffleBlockManager负责Shuffle Write
  • org.apache.spark.BlockStoreShuffleFetcher负责Shuffle Read
  • org.apache.spark.Aggregator负责combine,依赖于AppendOnlyMap

Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

通过文件合并,中间文件的生成方式修改为每个执行单位(一个Executor中的执行单位等于Core的个数除以每个Task所需的Core数)为每个reduce端的任务生成一个文件。最终可以将文件个数从MR修改为EC/T*R,其中,E表示Executor的个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数。是否采用Consolidate机制,需要配置spark.shuffle.consolidateFiles参数

Spark 0.9 引入ExternalAppendOnlyMap

在combine的时候,可以将数据spill到磁盘,然后通过堆排序merge

Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。

在代码结构上:

从以前的ShuffleBlockManager中分离出ShuffleManager来专门管理Shuffle Writer和Shuffle Reader。两种Shuffle方式分别对应

org.apache.spark.shuffle.hash.HashShuffleManager和

org.apache.spark.shuffle.sort.SortShuffleManager,

可通过spark.shuffle.manager参数配置。两种Shuffle方式有各自的ShuffleWriter:org.apache.spark.shuffle.hash.HashShuffle和org.apache.spark.shuffle.sort.SortShuffleWriter;但共用一个ShuffleReader,即org.apache.spark.shuffle.hash.HashShuffleReader。

org.apache.spark.util.collection.ExternalSorter实现排序功能。可通过对spark.shuffle.spill参数配置,决定是否可以在排序时将临时数据Spill到磁盘。

Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

Spark 1.4 引入Tungsten-Sort Based Shuffle

将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle。

在代码层面:

  • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
  • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java实现)
  • ShuffleReader复用HashShuffleReader

Spark 1.6 Tungsten-sort并入Sort Based Shuffle

由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle。

在代码方面:

  • UnsafeShuffleManager合并到SortShuffleManager
  • HashShuffleReader 重命名为BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader。

Spark 2.0 Hash Based Shuffle退出历史舞台,从此Spark只有Sort Based Shuffle,ShuffleManager的实现类就只有SortShufflemanager

2、sortshufflemanager.registerShuffle

3、sortshufflemanager.getReader

4、sortshufflemanager.getWriter

[html] view plaincopy

<embed id="ZeroClipboardMovie_1" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_1" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=1&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. case unsafeShuffleHandle: SerializedShuffleHandle] =>
  2. new UnsafeShuffleWriter(.......)
  3. case bypassMergeSortHandle: BypassMergeSortShuffleHandle=>
  4. new BypassMergeSortShuffleWriter(......)
  5. case other: BaseShuffleHandle =>
  6. new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)

5、BypassMergeSortShuffleWriter类似于hash shuffle,但是将output file合并成一个文件

1)、BypassMergeSortShuffleWriter.write

传参:partition的itearator

【如果record为空】

[html] view plaincopy

<embed id="ZeroClipboardMovie_2" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_2" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=2&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. if (!records.hasNext()) {
  2. partitionLengths = new long[numPartitions];
  3. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
  4. mapStatus = MapStatus.MODULE.apply(blockManager.shuffleServerId(), partitionLengths);
  5. return;
  6. }

【获取partition写入磁盘文件的writer】

[html] view plaincopy

<embed id="ZeroClipboardMovie_3" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_3" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=3&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. partitionWriters = new DiskBlockObjectWriter[numPartitions];
  2. for (int i = 0; i < numPartitions; i++) {
  3. final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
  4. blockManager.diskBlockManager().createTempShuffleBlock();
  5. final File file = tempShuffleBlockIdPlusFile._2();
  6. final BlockId blockId = tempShuffleBlockIdPlusFile._1();
  7. partitionWriters[i] =
  8. blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
  9. }

【写文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_4" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_4" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=4&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. while (records.hasNext()) {
  2. final Product2<K, V> record = records.next();
  3. final K key = record._1();
  4. partitionWriters[partitioner.getPartition(key)].write(key, record._2());
  5. }

【获取每个ShuffleBlock,ShuffleBlock被称为FileSegment】

[html] view plaincopy

<embed id="ZeroClipboardMovie_5" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_5" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=5&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. partitionWriterSegments = new FileSegment[numPartitions];
  2. for (int i = 0; i < numPartitions; i++) {
  3. final DiskBlockObjectWriter writer = partitionWriters[i];
  4. partitionWriterSegments[i] = writer.commitAndGet();
  5. writer.close();
  6. }

【合并文件以及写index文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_6" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_6" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=6&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  2. File tmp = Utils.tempFileWith(output);
  3. try {
  4. partitionLengths = writePartitionedFile(tmp);
  5. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  6. } finally {
  7. if (tmp.exists() && !tmp.delete()) {
  8. logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
  9. }
  10. }

2)、BypassMergeSortShuffleWriter.writePartitionedFile

传参:合并文件 File outputFile

[html] view plaincopy

<embed id="ZeroClipboardMovie_7" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_7" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=7&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. final FileOutputStream out = new FileOutputStream(outputFile, true);
  2. for (int i = 0; i < numPartitions; i++) {
  3. final File file = partitionWriterSegments[i].file();
  4. if (file.exists()) {
  5. final FileInputStream in = new FileInputStream(file);
  6. boolean copyThrewException = true;
  7. try {
  8. lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
  9. copyThrewException = false;
  10. } finally {
  11. Closeables.close(in, copyThrewException);
  12. }
  13. if (!file.delete()) {
  14. logger.error("Unable to delete file for partition {}", i);
  15. }
  16. }
  17. }
  18. threwException = false;

返回文件的偏移量

6、SortShuffleWriter

1)、SortShuffleWriter.writer

【对rdd进行排序】

[html] view plaincopy

<embed id="ZeroClipboardMovie_8" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_8" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=8&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. 如果map端进行combine则,反之则不关心key在每个partition中是否被排序,既不传递aggregator也不传递ordering
  2. sorter = if (dep.mapSideCombine) {
  3. require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
  4. //需要combine时,传递partitioner以及ordering
  5. new ExternalSorter[K, V, C](
  6. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  7. } else {
  8. new ExternalSorter[K, V, V](
  9. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  10. }
  11. //将数据存储在buffer或者map中,这是最关键的地方,根据需求(包括partition内的key排序,partitionID排序等等)排序,内存不够时数据会spill后写入spillfile
  12. sorter.insertAll(records)

【写文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_9" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_9" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=9&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. //blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
  2. val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  3. //通过工具类创建临时文件
  4. val tmp = Utils.tempFileWith(output)
  5. try {
  6. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  7. //将buffer或者map中的数据写入文件,各个partition
  8. val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  9. //写index文件
  10. shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  11. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  12. } finally {
  13. if (tmp.exists() && !tmp.delete()) {
  14. logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
  15. }
  16. }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容