Spark
https://blog.csdn.net/u011239443/article/list/6
https://tech.meituan.com/spark_tuning_basic.html
https://tech.meituan.com/spark_tuning_pro.html
聚合算子
reduceBy
groupBy
思路
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
原理
将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
流程
连接算子
join
场景一
参与连接的某一RDD足够小(可以广播该RDD)
-
思路
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
原理
流程
场景二
参与join的RDD都较大,不能广播,且RDD1倾斜,RDD2均匀,且可RDD2对应RDD1倾斜部分数据不大(可广播)
-
思路
我们的RDD_B并不能足够小到都能装进内存,但是有些RDD_A中的key会重复很多次,这时候你就可以想着只广播RDD_B中在RDD_A中出现最频繁的那些值。当一种key值在RDD_A中多到一个partition都装不下时,这种方法会非常有用。在这种情况下,你可以对RDD_A使用countByKeyApprox来近似得到哪些key需要广播。然后,你将从RDD_B中filter出来需要广播的RDD_B_0和不要广播的RDD_B_1,将RDD_B_0 collect成本地的HashMap。使用sc.broadcast广播该HashMap,使得每个节点都有一个备份,与RDD_A手动的执行join,得到结果RDD_C_1。再根据HashMap将RDD_A中多次重复的key值去掉,生成RDD_A_1。对RDD_A_1和RDD_B_1进行标准的join,得到结果RDD_C_0,并unoin上RDD_C_1,得到最终的结果。
原理
场景三
参与join的RDD都较大,不能广播,且RDD1倾斜,RDD2均匀,且可RDD2对应RDD1倾斜部分数据大(不可广播)
-
思路
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
-
原理
对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。
流程