spark shuffle
Shuffle就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂
在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有自己的shuffle实现过程。原文链接:https://blog.csdn.net/zhanglh046/article/details/78360762
总的来说,spark跟MR的shuffle并没有多大区别,都涉及到map(写数据的阶段),跟reduce(读数据阶段)。
spark shuffle 执行流程
本文通过源码分析spark shuffle的执行过程,以及相关参数的调优。
通过分析spark 提交的源码,我们可以知道,最终调用的是org.apache.spark.scheduler.Task
的runTask
方法,而Task有2个子类,ShuffleMapTask
(write(也可能存在先read后write,最后阶段是write)相当于MR中的Map阶段)跟ResultTask
(开始阶段是read,相当于MR中的Reduce阶段)
shuffle write阶段
首先,shuffle write分为write buffer、spill disk、merge file 三个阶段,分别对应上图的1,2,3
- write buffer: 将数据写到缓存buffer中,如果容量大于0.7,则会申请扩容buffer,如果能够申请到内存,就扩容继续写buffer
- sort and spill: 内存不足的时候,就会把缓存的数据排序后spill 到磁盘中
- merge file: 经过1,2阶段后,会产生多个spill后的文件,以及buffer内存中的数据,将内存以及磁盘的数据,排序后合并成1个分区且排序的大文件(数据文件),跟一个索引文件(描述数据文件),reducer会根据索引文件,拉取数据文件所需要的数据。
shuffle read阶段
shuffle read阶段,其实就是读取数据的阶段,你可以理解成,client向server发送请求,下载数据。主要是从client读取数据的过程,超时、并发度、异常重试等方面入手,server端则通过调整处理的并发数方面入手。
- Shuffle read过程
Reduce解析需要读取的数据,封装成一个个请求,向每个mergeData file读取相应的数据。
ExternalShuffleService
Spark 的 Executor 节点不仅负责数据的计算,还涉及到数据的管理。如果发生了 shuffle 操作,Executor 节点不仅需要生成 shuffle 数据,还需要负责处理读取请求。如果 一个 Executor 节点挂掉了,那么它也就无法处理 shuffle 的数据读取请求了,它之前生成的数据都没有意义了。
为了解耦数据计算和数据读取服务,Spark 支持单独的服务来处理读取请求。这个单独的服务叫做 ExternalShuffleService,运行在每台主机上,管理该主机的所有 Executor 节点生成的 shuffle 数据。有读者可能会想到性能问题,因为之前是由多个 Executor 负责处理读取请求,而现在一台主机只有一个 ExternalShuffleService 处理请求,其实性能问题不必担心,因为它主要消耗磁盘和网络,而且采用的是异步读取,所以并不会有性能影响。
解耦之后,如果 Executor 在数据计算时不小心挂掉,也不会影响 shuffle 数据的读取。而且Spark 还可以实现动态分配,动态分配是指空闲的 Executor 可以及时释放掉。
spark shuffle 参数调优
为了进一步优化内存的使用以及提高Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。所以,开启堆外内存对于调优非常重要
-
spark.shuffle.io.preferDirectBufs
:是否优先使用堆外内存 -
spark.memory.offHeap.enabled
: 是否启用堆外内存 -
spark.memory.offHeap.size
: 设置堆外内存大小
shuffle write阶段参数调优
提高task可用内存,减少spill文件
spark.executor.memory
shuffle read阶段参数调优
提高拉取的并行度
spark.sql.shuffle.partitions
-
spark.shuffle.file.buffer
提高拉取的数据量 spark.reducer.maxSizeInFlight
spark.reducer.maxReqsInFlight
spark.reducer.maxBlocksInFlightPerAddress
spark.reducer.maxReqSizeShuffleToMem
-
spark.shuffle.io.clientThreads
提高重试、超时增加拉取数据的容错性 spark.shuffle.io.maxRetries
spark.shuffle.io.retryWait
ExternalShuffleService参数调优
ExternalShuffleService本质是一个基于Netty写的Netty服务,所以相关调优就是对Netty参数的调优,主要有以下这些参数,具体调整,需要根据实际情况做出相应的调整,提高服务稳定性。
服务启动时处理请求的线程数,默认是服务器的cores * 2
spark.shuffle.io.serverThreads
spark.shuffle.io.receiveBuffer
spark.shuffle.io.backLog
spark.shuffle.io.sendBuffer
实战
核心ETL任务调优
- 首先确认调优目标:比原来的执行时间减少50%以上
- 统计出每个ETL任务的数据量,根据上文的调优方向,write、read和ExternalShuffleService各个阶段的调优方向
- 经过大量的测试验证,最终我们发现,数据量在1亿左右使用1T1P120C的使用配置,对比原来的资源,资源占用增加1倍,耗时减少90%
- 3-5亿数据,使用1T1P120C资源使用配置,资源增加20%~60%,时间减少55%
- 28亿数据,使用3T3P600C资源使用减少40%,速度提升3.8倍
T: 使用的内存 1T=1024G
P: 配置spark.sql.shuffle.partitions,1P=1000
C: cpu cores数量
ETL任务调优效果
参考链接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789