Shuffle 操作 (官方编程指南翻译)
原文地址: http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations
文章仅限交流使用,转载请注明出处。如有错误,欢迎指出!
Henvealf/译
在 Spark 触发一个事件的时确定无疑的发生的操作就叫做 shuffle 。Shuffle 是 Spark 重建数据的机制,来将来自不同分区的数据进行分组。典型的麻烦就是需要在执行器和机器之间进行数据拷贝,是 shuffle 中一个复杂并且大开销的操作。
背景
为了理解 shuffle 过程中发生了什么,我们可以考虑一下 reduceByKey 操作。 reduceByKey 操作会将相同key中的值聚合进一个元祖中而生成一个新的 RDD -- 值的合并操作是依赖于key的。这里的挑战就是在 key 中,并不是所有值都被分在了一个分区里,或者说不是在相同的机器里,但是他们必须要放在一个地方才能计算出结果。
在 Spark 中, 对于指定的操作,数据在一般情况下不是按照分区来分布到必要的地方。在计算过程中,一个单独的任务将会在一个单独的分区上操作,事实上,如果将所有的数据组织在一个单独的 reduceByKey 汇合任务上去执行的话,Spark 需要去执行一个多对多(all-to-all,多个task与多个分区?)的操作.他必须从所有的分区中为每一个 key 寻找他们的 values,然后跨越分区将同key的值带到一起去计算最后的结果--这个过程就叫做 shuffle。
尽管在经历了shuffle数据之后的每个分区中的元素是确定的,但分区内部自己进行排序,所以他们的元素的也能是无顺序的。如果想要在 shuffle 后希望是有序的,使用下面的方式就能做到:
- mapPartitions 使用像 sorted 一样的方法去排序每个分区。
- 当重新分区的同时可以使用 repartitionAndSortWithinPartitions 来对分区进行快速的排序。
- sordBy 可一全局的排序一整个 RDD。
想 repartition 和 coalesce 这种重新分区的操作,groupByKey 和 reduceByKey 这种 ‘ByKey’ 操作 和 cogroup 和 join 这种 join 操作也都会触发 shuffle,
效率影响
Shuffle 是一个开销很大的操作,他会同时用到 磁盘读写,数据序列化以及网络读写。为了为 shuffle操作组织数据, Spark 会产生 -map 与 -reduce 两种任务,是从 MapReduce 中得来的名词,与 Spark 中 map 与 reduce 函数不是同一个东西。
内建的情况下, 从 map 任务生成的结果会一直放在内存中,直到内存放不下。然后,数据会基于分区进行排序,然后写进一个文件中。在 reduce 端,就直接读取与之相关的已经排序好的块。
确定的 shuffle 操作能够消费客观数量的堆内存,在传输记录之后,他使用的是内存中的数据结构来组织这些记录。特别的,reduceByKey 和 aggregateByKey 在 map 端创建这些结构和 ByKey操作在 reduce 端产生他们(??)。如果数据在内存中放不小了,就放入磁盘中并增强垃圾收集。
Shuffle 也会在磁盘上产生大量的中间文件。像 Spark 1.3,直到相应的 RDD 不在被使用和垃圾收集,这些文件会一直保存着。所以如果 lineage 没有被重新运算, shuffle 文件就不需要重建了。如果应用程序维持着对着些 RDD 的引用或者 GC 内有频繁的 kick,垃圾收集可能会经过很长的周期才发生。这意味着一个需要运行很长时间的 Spark job 与需要消耗大量的磁盘空间。中间文件的存储路径在配置 Spark context 的时候使用 spark.local.dir 来指定。
Shuffle行为能够使用各种各样的配置参数来调节。详细请看 Spark Configuration Guide 的 Shuffle Behavior 章节。