spark shuffle
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。原文链接:https://blog.csdn.net/zhanglh046/article/details/78360762
总的来说,spark跟MR的shuffle并没有多大区别,都涉及到map(写数据的阶段),跟reduce(读数据阶段)。
spark shuffle 执行流程
本文通过源码分析spark shuffle的执行过程,以及相关参数的调优。
通过分析spark 提交的源码,我们可以知道,最终调用的是org.apache.spark.scheduler.Task
的runTask
方法,而Task有2个子类,ShuffleMapTask
(write(也可能存在先read后write,最后阶段是write)相当于MR中的Map阶段)跟ResultTask
(开始阶段是read,相当于MR中的Reduce阶段)
shuffle write阶段
查看ShuffleMapTask
可以知道有3种Writer,这里我们只讨论最常用的SortShuffleWriter
。
Shuffle write 的'HashMap' 跟'Array'
Shuffle write没有使用常见的collection或者map,而是用一个大数组,第一位存储key,key的下一位存储value,存储的格式类都是K: (getPartition(key), key) V: value
其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,黑色部分存储 Value。如下图:
1.PartitionedAppendOnlyMap
PartitionedAppendOnlyMap extends AppendOnlyMap, AppendOnlyMap的官方介绍是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是类似 HashMap,但没有remove(key)方法。
具体操作拿到数据后
- 计算key的hash值的位置pos,2pos,如果2pos位置没有数据,则在2pos的位置放入key,2pos +1 的位置放入value,如果2*pos上有值,计算key是否等于,如果等于,则用传入的函数更新,如wordcount中的reduceByKey(_ + _), 计算出新的value后更新,如果不等于,则通过(pos + delta) & mask 的方法重新计算hash值得位置,delta 从1开始,遇到key存在每次递增1
- 当容量>growThreshold(0.7 * size),就是大于70%,数组会扩容,变为原来得2倍,然后重新计算原数据得每个值,写入到新的数组中。
- 每次插入后,会判断当前大约容量,通过估算得方式计算占用的内存,每32次估算一次,如果大于当前的内存,就会向taskMemoryManager.acquireExecutionMemory申请内存,如果申请成功,则继续写入,如果写入不成功,则spill磁盘,所以,第一个优化点,理论上executor内存越大,在内存可存储的数据越多,spill磁盘的次数越少,速度越快。spill的过程,调用collection.destructiveSortedWritablePartitionedIterator(comparator),首先会将数据往前移动,填满中间空缺的位置,然后将内存中的数据进行排序,用的排序算法是TimSort,最后按照分区且排序的形式写入文件中。
2.PartitionedPairBuffer
相对比较简单,不需要mapCombine,只需要将数据按照kv追加到数组后面,如下图。
spill溢写磁盘与PartitionedAppendOnlyMap一样,不过不需要移动数据,填充空缺的位置,数据本身就是紧密的。
Shuffle write 合并
通过PartitionedAppendOnlyMap或者PartitionedPairBuffer操作完所有的数据后,会生成一个内存collection和0个或者多个分区且排序的文件(如果数据量过大有spill操作),最后通过外排将内存的数据跟spill的文件数据,通过merge sort合并成1个分区且排序的大文件(shuffleId_mapId_0.data),跟一个索引文件(shuffleId_mapId_0.index),类似kafka里的segment跟.index文件。索引主要描述每个分区对应的数据,比如0-100是0号分区,101-200是1号分区的数据,为了给reducer fetch对应分区的数据。
shuffle read阶段
shuffle read阶段,其实就是读取数据的阶段,你可以理解成,client向server发送请求,下载数据。主要是从client读取数据的过程,超时、并发度、异常重试等方面入手,server端则通过调整处理的并发数方面入手。
Shuffle read过程
ResultTask调用runTask最终调用的是ShuffleRdd的compute方法,然后我们可以看到实际上是BlockStoreShuffleReader的read方法。read方法中,通过new ShuffleBlockFetcherIterator(), 注意这里有4个优化参数, new ShuffleBlockFetcherIterator的时候,通过mapOutputTracker获取属于自己的Iterator[(BlockManagerId, Seq[(BlockId, Long)])],然后ShuffleBlockFetcherIterator的initialize方法,是整个read切分读request的逻辑。
new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
ShuffleBlockFetcherIterator.initialize
首先将val targetRequestSize = math.max(maxBytesInFlight / 5, 1L),默认是48m/5=9.6m。判断拉取的数据是否大于9.6m或者一个address拉取的blocks数大于maxBlocksInFlightPerAddress(默认是Int.MaxValue),所以只由9.6m控制,如果是,则封装成一个new FetchRequest(address, curBlocks),最后会封装成N个FetchRequest。然后开始遍历拉数据,判断是否isRemoteBlockFetchable,逻辑是
def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
fetchReqQueue.nonEmpty &&
(bytesInFlight == 0 ||
(reqsInFlight + 1 <= maxReqsInFlight &&
bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
}
总之就是正在拉取的数据不能大于spark.reducer.maxSizeInFlight(默认48m)并且请求数不能超过maxReqsInFlight(Int.MaxValue),不然就进入等待的deferredFetchRequests队列。所以,为了提高shuffle read的request并发读的数量,可以提高maxBytesInFlight(默认48m)的大小。并且单个address拉取的blocks数不能超过maxBlocksInFlightPerAddress(默认是Int.MaxValue),所以,降低maxBlocksInFlightPerAddress可以降低同时拉取的blocks数量,防止同时拉取多个blocks导致io过高,导致服务无响应、io超时等异常。
最终执行sendRequest请求拉取数据,调优参数spark.shuffle.io.maxRetries(默认是3)拉取失败重试的次数, spark.shuffle.io.retryWait(默认是5秒)失败后等待5秒后尝试重新拉取数据。
spark shuffle 参数调优
为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。所以,开启堆外内存对于调优非常重要
-
spark.shuffle.io.preferDirectBufs
:是否优先使用堆外内存 -
spark.memory.offHeap.enabled
: 是否启用堆外内存 -
spark.memory.offHeap.size
: 设置堆外内存大小
shuffle write阶段参数调优
-
spark.executor.memory
:通过分析write的过程中可以知道,单个task可用的内存越大,可申请的内存越大,spill disk的次数越少,速度越快,所以,可以适当提高该参数 -
spark.sql.shuffle.partitions
:提高并行度可以减少单个task处理的数据量,减少spill disk次数,降低oom风险,但是并不是越大越好,提高该参数,会增加task的数量,跟线程的数量一个道理,到达一定阈值,线程数的越多反而会增加系统上下文切换的压力,需要一点点测试,根据不同的任务,确定具体的数据
shuffle read阶段参数调优
-
spark.reducer.maxSizeInFlight
:默认是48m,一个请求拉取一个块的数据为48/5=9.6m,理想情况下会有5个请求同时拉数据,但是可能遇到一个大块,超过48m,就只有一个请求在拉数据,无法并行,所以可用适当提高该参数 -
spark.reducer.maxReqsInFlight
:shuffle read的时候最多有多少个请求同时拉取数据,默认是Integer.MAX_VALUE,一般不优化,不修改 -
spark.reducer.maxBlocksInFlightPerAddress
: 一个拉取的请求,包含多少个server,默认一个请求是9.6m,但是可能每个server拉取的文件非常小,只有几k,那样一个请求就需要请求上千个server拉取数据,容易导致超时等异常,所以,适当降低该参数 -
spark.reducer.maxReqSizeShuffleToMem
:read 过程中内存可以存放最大的数据量,超过将会把拉取的数据放到磁盘 -
spark.shuffle.io.maxRetries
:一个请求拉取失败时重试次数,增大该参数,可能会延迟任务执行时间,但是可以提高任务成功率 -
spark.shuffle.io.retryWait
:一个请求拉取失败时的等待时间,增大该参数,可能会延迟任务执行时间,但是可以提高任务成功率 -
spark.shuffle.io.clientThreads
: 拉取数据client的线程个数, 可适当调高 -
spark.shuffle.file.buffer
: write spill磁盘的时候,缓冲区大小
ExternalShuffleService
Spark 的 Executor 节点不仅负责数据的计算,还涉及到数据的管理。如果发生了 shuffle 操作,Executor 节点不仅需要生成 shuffle 数据,还需要负责处理读取请求。如果 一个 Executor 节点挂掉了,那么它也就无法处理 shuffle 的数据读取请求了,它之前生成的数据都没有意义了。
为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求。这个单独的服务叫做 ExternalShuffleService,运行在每台主机上,管理该主机的所有 Executor 节点生成的 shuffle 数据。有读者可能会想到性能问题,因为之前是由多个 Executor 负责处理读取请求,而现在一台主机只有一个 ExternalShuffleService 处理请求,其实性能问题不必担心,因为它主要消耗磁盘和网络,而且采用的是异步读取,所以并不会有性能影响。
解耦之后,如果 Executor 在数据计算时不小心挂掉,也不会影响 shuffle 数据的读取。而且Spark 还可以实现动态分配,动态分配是指空闲的 Executor 可以及时释放掉。
ExternalShuffleService参数调优
ExternalShuffleService本质是一个基于Netty写的Netty服务,所以相关调优就是对Netty参数的调优,主要有以下这些参数,具体调整,需要根据实际情况做出相应的调整,提高服务稳定性。
// 服务启动时处理请求的线程数,默认是服务器的cores * 2
spark.shuffle.io.serverThreads
// ChannelOption.SO_RCVBUF,
spark.shuffle.io.receiveBuffer
// ChannelOption.SO_BACKLOG
spark.shuffle.io.backLog
// ChannelOption.SO_SNDBUF
spark.shuffle.io.sendBuffer
实际应用的效果
T: 使用的内存 1T=1024G
P: 配置spark.sql.shuffle.partitions,1P=1000
C: cpu cores数量
参考链接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789