(四)SPARK性能优化----避免使用shuffle类算子

如果有可能的话,要尽量少使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子都会触发shuffle操作。

shuffle过程中,各个节点上相同的key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中相同的key。而且相同的key都拉取到同一节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此,在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO以及网络数据传输也是shuffle性能较差的主要原因。

因此在我们开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、reparation等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

Broadcast和map进行join代码示例:


//传统的join操作会导致shuffle操作。
//因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
valrdd3=rdd1.join(rdd2)

//Broadcast+map的join操作,不会导致shuffle操作。
//使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data=rdd2.collect()
val rdd2DataBroadcast=sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3=rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。


©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容