数据倾斜产生背景
数据倾斜表现形势和网上流传不大一样,欢迎批评指正--李小李
- 在开发DMP平台过程中,有一个标签体系时地理位置标签,需要根据IP解析出地理位置,原始数据是经过每天聚合之后的数据,聚合之后的格式为(imei,city,city_freq),在每天的基础之上,需要进行以月为单位的聚合,使用spark进行数据聚合统计,即sql语句为:
select imei,city,sum(city_freq) as city_freq from mid_day_city_freq where dt>=20190301 and dt < 20190328 group by imei,city
- 经过以天为单位的聚合之后,数据量为1.4G,字段为imei,city,city_freq,文件格式为parquet并以gz格式进行压缩;
- 在进行以月为单位的聚合统计时,数据量在约42G,数据条数约为4532293316,spark job共分为两个stage,每个stage的最后4个task运行极慢,意识到遇到了数据倾斜,聚合类的数据倾斜;
聚合类数据倾斜原因分析
- 因为时聚合类统计,在spark sql中使用到了group by 和sum,分组条件为imei和city,所以为imei 在文件中出现的次数进行统计,'select imei ,count(imei) as imei_count from temp;'
- imei 频次分布情况如下:
imei总数为244462334个; imei频次在300次以上有1274个; imei频次在500次以上有526个; imei频次在1000次以上有173个; imei频次在2000次以上有58个; imei频次在3000次以上有32个;
- 在统计imei频次的时候,也是出现了数据倾斜,每个stage最后两个task运行极慢,但是可以运行成功;
- spark job资源分配情况如下
spark-submit --calss cn.ted.city \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 50 \
--executor-memory 2g \
--executor-cores 2 \
--queue default \
--conf spark.default.parallelism=600 \
--conf spark.storage.memoryFunction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
--conf spark.sql.planner.skewJoin=true \
--conf spark.sql.shuffle.partitions=600 \
/home/liyahui_su/bbc/bbc_dmp/bbc_dmp-1.0-SNAPSHOT.jar
解决方案分析及代码实现
过滤掉异常数据方案
spark job DAG 图:
- 方案选择分析
- 因为时统计每个月中imei对应的city频次,对于imei频次在300次以上,说明每天的city更换为10次,可以理解为sdk在抓取ip的时候出现了异常,导致数据不准确,联系到业务应用,时需要进行DMP标签体系地理位置标签,所以可以将这一部分异常imei进行过滤掉。
- 方案实行:在统计imei出现频次时将imei_count>=300的imei号过滤保存到字典库里面,在进行月度聚合的是时候,首先进行join过滤,在进行聚合统计,详见代码;
- 运行时间对比:
- 未做任何imei号过滤:45min;
- 过滤掉imei_count >=1000的imei,40min;
- 过滤掉imei_count >=500的imei,20min;
-
过滤掉imei_count >=300的imei,6min;其中过滤掉imei_count>=300的spark job运行情况如下:
- 总结
- 在进行imei号过滤时,数据运行时间的会得到进一步的缓解,在imei_count>=300时,数据倾斜的情况可以说得到了根本解决,输入数据量在35G,运行时长在6min;
- 过滤倾斜key方案较简单,过滤倾斜key的数量需要逐步测试,应用较为局限。如果说是需要统计每个城市出现的imei号频次呢,那么就不能再对imei号进行倾斜过滤了。所以需要对倾斜的key进行打散,采用两端聚合的方式进行统计;
此项方案具体代码实现:
package cn.ted.dmp.bank.mining.geo_active
import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Author: LiYahui
* Date: Created in 2019/3/26 13:44
* Description: TODO 按照月为单位统计地理位置出现的频次
* Version: V1.0
*/
object AggProvinceCityFreqMonth {
def main(args: Array[String]): Unit = {
val Array(provincePath, cityPath, skewImeiPath, provinceOutputPath, cityOutputPath) = args
if (args.length != 5) {
println(
s"""
|${this.getClass.getSimpleName}
| val Array(provincePath, cityPath,skewImeiPath ,provinceOutputPath, cityOutputPath) = args
""".stripMargin)
sys.exit(-1)
}
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//读取需要过滤到的imei号
import spark.implicits._
val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
import org.apache.spark.sql.functions.sum
//按月聚合city
val cityDF: DataFrame = spark.read.parquet(cityPath)
cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
.filter("skew_imei is null")
.drop("skew_imei")
.groupBy("imei", "city")
.agg(sum("city_freq").alias("city_freq"))
.repartition(10) //进行充分去,减少小文件
.write
.mode(SaveMode.Overwrite)
.parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))
spark.stop()
sc.stop()
}
}
两端聚合的方式应对倾斜部分数据
- 方案分析:首先将数据分为倾斜数据part,和非倾斜数据part。重点解决倾斜数据part,最后将两数据union或者是在hdfs上进行文件夹的合并;不过建议是最好是能进行文件夹级别的数据合并;
- 方案选择:对倾斜的数据进行分组的key进行加上随机前缀,进行第一次聚合,然后再进行去掉随机前缀,再次进行聚合统计,最后落地到HDFS或者是其他存储系统;
ABC方案测试
A.单独计算倾斜部分数据
方案分析:对倾斜的key进行匹配,过滤出倾斜的数据,单独进行group by 聚合统计,观察效率和时长,是否会造成倾斜。数据量在1G左右;
代码开发较为简单,直接采用join连接,filter过滤就可以了,详见如下:
package cn.ted.dmp.bank.mining.geo_active
import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Author: LiYahui
* Date: Created in 2019/3/26 13:44
* Description: TODO 按照月为单位统计地理位置出现的频次
* Version: V1.0
*/
object AggCityFreqMonthSkew {
def main(args: Array[String]): Unit = {
val Array(provincePath, cityPath, skewImeiPath, cityOutputPath) = args
if (args.length != 4) {
println(
s"""
|${this.getClass.getSimpleName}
| val Array(provincePath, cityPath,skewImeiPath , cityOutputPath) = args
""".stripMargin)
sys.exit(-1)
}
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//读取需要过滤到的imei号
import spark.implicits._
val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
import org.apache.spark.sql.functions.sum
//按月聚合city
val cityDF: DataFrame = spark.read.parquet(cityPath)
//过滤出倾斜数据,直接进行统计
cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
.filter("skew_imei is not null")
.drop("skew_imei")
.groupBy("imei", "city")
.agg(sum("city_freq").alias("city_freq"))
.repartition(10)
.write
.mode(SaveMode.Overwrite)
.parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))
spark.stop()
sc.stop()
}
}
程序运行DAG图如下:
程序各stage时间如下:
方案总结:在stage1和stage3中仍热存在executor运行时间过长问题(相对于整个stage的运行时间),数据倾斜的问题依然存在;整体运行时长为9min。
聚合之后数据条数为:58848条,imei数量为1110个;
B采用SparkSQL对imei进行添加随机前缀,将imei进行打散的方式
方案分析:采用sparksql的api,使用udf函数,对imei进行添加随机前缀,进行第一次聚合,再去除imei的随机前缀,再次进行聚合统计。添加随机前缀的范围最好和executors的数量匹配,或者是进行实际的运行状况进行stage
代码开发如下:
package cn.ted.dmp.bank.mining.geo_active
import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* Author: LiYahui
* Date: Created in 2019/3/26 13:44
* Description: TODO 按照月为单位统计地理位置出现的频次
* Version: V1.0
*/
object AggCityFreqMonthSkew {
def main(args: Array[String]): Unit = {
val Array(provincePath, cityPath, skewImeiPath, provinceOutputPath, cityOutputPath) = args
if (args.length != 5) {
println(
s"""
|${this.getClass.getSimpleName}
| val Array(provincePath, cityPath,skewImeiPath ,provinceOutputPath, cityOutputPath) = args
""".stripMargin)
sys.exit(-1)
}
val spark: SparkSession = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.parquet.compression.codec", "gzip")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//读取需要过滤到的imei号
import spark.implicits._
val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
import org.apache.spark.sql.functions.sum
//按月聚合city
val cityDF: DataFrame = spark.read.parquet(cityPath)
spark.udf.register("addPre", addPrefix(_: String))
spark.udf.register("removePre", removePrefix(_: String))
//过滤出倾斜的数据集,进行第一次聚合
val firstAggDF: DataFrame = cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
.filter("skew_imei is not null")
.drop("skew_imei")
.selectExpr("addPre(imei) as imei_prefix", "city", "city_freq")
.groupBy("imei_prefix", "city")
.agg(sum("city_freq").alias("city_freq"))
//进行第二次聚合
firstAggDF.selectExpr("removePre(imei_prefix) as imei", "city", "city_freq")
.groupBy("imei", "city")
.agg(sum("city_freq").alias("city_freq"))
.repartition(10)
.write
.mode(SaveMode.Overwrite)
.parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))
spark.stop()
sc.stop()
}
/**
* 添加30以内的随机数前缀
*
* @param key
* @return
*/
def addPrefix(key: String): String = {
val i: Int = scala.util.Random.nextInt(30)
i + "_" + key
}
/**
* 去掉前缀
*
* @param key
* @return
*/
def removePrefix(key: String): String = {
key.split("_")(1)
}
}
程序运行stage图如下:
程序各stage运行时间如下:
从上述stage图中可以看出,各个stage运行时长均相对于A方案中时长减半,整体运行时长为5min,数据倾斜得到缓解;
方案评估:对于采用过滤出倾斜部分的数据集,需要对原始数据进行两次遍历(第一次计算非倾斜数据,第二次计算倾斜数据)。这种情况类似于“分而治之”的思想,对于解决或者是缓解数据倾斜还是有一定的效果的。
C.采用Spark RDD的对imei进行打散聚合
方案分析:需要先将倾斜的数据集进行过滤保存,因为如果直接对读取的parquet文件由df转成rdd,需要将所有的数据量进行转换,消耗性能。因为df转成rdd需要对数据进行序列化,极大的消耗了性能。所以先将倾斜的数据进行取出另存。
注意:这种方法一定要将倾斜的数据先取出另存为一个临时型文件夹,再使用spark rdd的方式进行处理,这样会减少df转换为rdd的时间和性能消耗;
代码如下:
spark.read.parquet("/user/liyahui_su/bbc/sms_phone_parquet_rt/bbc_mining/geo_freq_city_month_test")
.rdd
.map(line => {
val imei: String = line.getAs[String]("imei")
val city: String = line.getAs[String]("city")
val city_freq: Long = line.getAs[Long]("city_freq")
//生成30以内的随机数,并将数据封装成key,value型数据
val i = scala.util.Random.nextInt(30)
(i + "_" + imei + "_" + city, city_freq)
}).reduceByKey(_ + _) //第一次聚合
.map(line => {
val arr: Array[String] = line._1.split("_")
(arr(1) + "_" + arr(2), line._2)
}).reduceByKey(_ + _) //第二次聚合
.map(line => {
val arr: Array[String] = line._1.split("_")
(arr(0), arr(1), line._2)
})
.toDF("imei", "city", "city_freq")
.repartition(10)
.write.parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))
方案总结
对于聚合类数据倾斜,首先需要统计导致倾斜的key的频次和数量,最好是保存作为一个字典库;采用"分而治之"的思想,对原数据集进行非倾斜数据和倾斜数据进行划分。对于非倾斜数据,直接采用业务逻辑的处理方式;对于倾斜的数据,需要进行过滤,保存到临时文件夹,再对数据集进行B方案或者是C方案的处理。
方案准备:
第一步:需要找出导致业务数据倾斜的key,在找出业务数据倾斜的key的过程中也会发生倾斜的情况,这个时候不要采用sample取样的方式,既然是想解决倾斜,就完全的统计出倾斜的key。
第二步:最简单的就是将倾斜的数据集过滤掉计算,但是这个不能满足实际业务开发的需要;在ABC方案中,进行选择与B方案类似的解决方案,C方案次之,A方案是最后的选择。
总结:遇到数据倾斜时,不要害怕,要弄清楚造成数据倾斜的原因时什么,和发生数据倾斜的spark 原理是什么,按照发生数据倾斜的症状对症下药,不断的调试和尝试、总结,就能找到一条合适的数据倾斜解决之路,对自己的技术提升也会有一定的帮助。
结束语:
咖啡如果不苦,那么估计就没有人喜欢它了