常见的数据倾斜是怎么造成的?
Shuffle的时候,将各个节点上相同的key拉取到某个节点的一个task进行处理,比如按照key进行聚合或join等操作,如果某个key对应的数据量特别大的话,就会发生数据倾斜现象。数据倾斜就成为了整个task运行时间的短板。
触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方。
首先是哪个stage,直接在Web UI上看就可以,一般出现倾斜都是耗时特别长的Stage,然后查看运行耗时的task,一般是其中的某几个Task一直拖着,其他的Task早已经完成了,根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。
如何查看发生倾斜的RDD呢?
如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。
以下方法可以大概看出哪个key出现了倾斜:
JavaPairRDD<String, String> hssData = getHssData(fs, sc, hssPath);
JavaPairRDD<String, String> sample = hssData.sample(false, 0.1);
Map<String, Object> countByKey = sample.countByKey();
出现倾斜的key有两种情况:
1、某个可以出现倾斜
2、多个key出现倾斜
某个Key出现倾斜解决办法:
通过上述方法可以知道是哪个Key出现了倾斜,所以可以先通过filter方法过滤掉倾斜的Key,把倾斜的Key和没有倾斜的Key分开处理,由于Spark运行机制,所以单独处理倾斜Key的时候就不会再出现倾斜现象。
上述方法只能处理特定的数据倾斜,对于实际的生产环境可能并不怎么适用,这事是解决倾斜的其中一个方法。
多个Key出现倾斜的解决办法:
原理:在倾斜Shuffle之前给每一个Key都加上一个随机前缀,然后再给加了前缀的Key进行一个Shuffle操作,在Shuffle操作后再把Key的前缀去掉。在这个过程中由于前缀的加入,会把倾斜的Key随机的分配到不同的Task。然后去掉前缀从而解决数据倾斜的问题。
private static JavaPairRDD<String, agg> repar(
JavaPairRDD<String, agg> 。Cdr) {
JavaPairRDD<String, agg> mapToPair;
try {
mapToPair = 。Cdr
.mapToPair(new PairFunction<Tuple2<String, agg>, String, agg>() {
@Override
public Tuple2<String, agg> call(Tuple2<String, agg> t)
throws Exception {
//产生随机前缀,随机数大小看情况决定
long i = (long) (Math.random() * 150);
//添加随机数前缀
return new Tuple2<String, agg>(i + "_" + t._1, t._2);
}
}).sortByKey()//进行一个Shuffle操作打乱Key
//去掉随机数前缀
.mapToPair(new PairFunction<Tuple2<String, agg>, String, agg>() {
@Override
public Tuple2<String, agg> call(Tuple2<String, agg> t)
throws Exception {
String str = t.1.split("")[0];
return new Tuple2<String, agg>(str, t._2);
}
});
} catch (Exception e) {
return null;
}
return mapToPair;
}
以上是解决RDD数据倾斜简单方法。