Spark--聚合性数据倾斜解决方案实例说明

数据倾斜产生背景

数据倾斜表现形势和网上流传不大一样,欢迎批评指正--李小李

  • 在开发DMP平台过程中,有一个标签体系时地理位置标签,需要根据IP解析出地理位置,原始数据是经过每天聚合之后的数据,聚合之后的格式为(imei,city,city_freq),在每天的基础之上,需要进行以月为单位的聚合,使用spark进行数据聚合统计,即sql语句为:
select imei,city,sum(city_freq) as city_freq from mid_day_city_freq where dt>=20190301 and dt < 20190328 group by imei,city
  • 经过以天为单位的聚合之后,数据量为1.4G,字段为imei,city,city_freq,文件格式为parquet并以gz格式进行压缩;
  • 在进行以月为单位的聚合统计时,数据量在约42G,数据条数约为4532293316,spark job共分为两个stage,每个stage的最后4个task运行极慢,意识到遇到了数据倾斜,聚合类的数据倾斜;

聚合类数据倾斜原因分析

  • 因为时聚合类统计,在spark sql中使用到了group by 和sum,分组条件为imei和city,所以为imei 在文件中出现的次数进行统计,'select imei ,count(imei) as imei_count from temp;'
  • imei 频次分布情况如下:
    imei总数为244462334个;
    imei频次在300次以上有1274个;
    imei频次在500次以上有526个;
    imei频次在1000次以上有173个;
    imei频次在2000次以上有58个;
    imei频次在3000次以上有32个;
    
  • 在统计imei频次的时候,也是出现了数据倾斜,每个stage最后两个task运行极慢,但是可以运行成功;
  • spark job资源分配情况如下
spark-submit   --calss cn.ted.city   \
--master yarn \
--deploy-mode cluster \
--driver-memory 1G \
--num-executors 50 \
--executor-memory 2g \
--executor-cores 2 \
--queue default  \
--conf spark.default.parallelism=600 \
--conf spark.storage.memoryFunction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
--conf spark.sql.planner.skewJoin=true  \
--conf spark.sql.shuffle.partitions=600 \
/home/liyahui_su/bbc/bbc_dmp/bbc_dmp-1.0-SNAPSHOT.jar 

解决方案分析及代码实现

过滤掉异常数据方案

spark job DAG 图:


DAG运行图.PNG
  • 方案选择分析
  • 因为时统计每个月中imei对应的city频次,对于imei频次在300次以上,说明每天的city更换为10次,可以理解为sdk在抓取ip的时候出现了异常,导致数据不准确,联系到业务应用,时需要进行DMP标签体系地理位置标签,所以可以将这一部分异常imei进行过滤掉。
  • 方案实行:在统计imei出现频次时将imei_count>=300的imei号过滤保存到字典库里面,在进行月度聚合的是时候,首先进行join过滤,在进行聚合统计,详见代码;
  • 运行时间对比:
    • 未做任何imei号过滤:45min;
    • 过滤掉imei_count >=1000的imei,40min;
    • 过滤掉imei_count >=500的imei,20min;
    • 过滤掉imei_count >=300的imei,6min;其中过滤掉imei_count>=300的spark job运行情况如下:


  • 总结
    • 在进行imei号过滤时,数据运行时间的会得到进一步的缓解,在imei_count>=300时,数据倾斜的情况可以说得到了根本解决,输入数据量在35G,运行时长在6min;
    • 过滤倾斜key方案较简单,过滤倾斜key的数量需要逐步测试,应用较为局限。如果说是需要统计每个城市出现的imei号频次呢,那么就不能再对imei号进行倾斜过滤了。所以需要对倾斜的key进行打散,采用两端聚合的方式进行统计;
      此项方案具体代码实现:
package cn.ted.dmp.bank.mining.geo_active

import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
  * Author:  LiYahui
  * Date:  Created in  2019/3/26 13:44
  * Description: TODO 按照月为单位统计地理位置出现的频次
  * Version: V1.0         
  */
object AggProvinceCityFreqMonth {
  def main(args: Array[String]): Unit = {
    val Array(provincePath, cityPath, skewImeiPath, provinceOutputPath, cityOutputPath) = args
    if (args.length != 5) {
      println(
        s"""
           |${this.getClass.getSimpleName}
           |     val Array(provincePath, cityPath,skewImeiPath ,provinceOutputPath, cityOutputPath) = args
        """.stripMargin)
      sys.exit(-1)
    }
    val spark: SparkSession = SparkSession.builder()
        .appName(s"${this.getClass.getSimpleName}")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //读取需要过滤到的imei号
    import spark.implicits._
    val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()

    val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
      import org.apache.spark.sql.functions.sum
      //按月聚合city
    val cityDF: DataFrame = spark.read.parquet(cityPath)
    cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
        .filter("skew_imei is null")
        .drop("skew_imei")
        .groupBy("imei", "city")
        .agg(sum("city_freq").alias("city_freq"))
        .repartition(10)   //进行充分去,减少小文件
        .write
        .mode(SaveMode.Overwrite)
        .parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))

    spark.stop()
    sc.stop()
  }
}

两端聚合的方式应对倾斜部分数据

  • 方案分析:首先将数据分为倾斜数据part,和非倾斜数据part。重点解决倾斜数据part,最后将两数据union或者是在hdfs上进行文件夹的合并;不过建议是最好是能进行文件夹级别的数据合并;
  • 方案选择:对倾斜的数据进行分组的key进行加上随机前缀,进行第一次聚合,然后再进行去掉随机前缀,再次进行聚合统计,最后落地到HDFS或者是其他存储系统;
ABC方案测试
A.单独计算倾斜部分数据

方案分析:对倾斜的key进行匹配,过滤出倾斜的数据,单独进行group by 聚合统计,观察效率和时长,是否会造成倾斜。数据量在1G左右;
代码开发较为简单,直接采用join连接,filter过滤就可以了,详见如下:

package cn.ted.dmp.bank.mining.geo_active

import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
  * Author:  LiYahui
  * Date:  Created in  2019/3/26 13:44
  * Description: TODO 按照月为单位统计地理位置出现的频次
  * Version: V1.0         
  */
object AggCityFreqMonthSkew {
  def main(args: Array[String]): Unit = {
    val Array(provincePath, cityPath, skewImeiPath,  cityOutputPath) = args
    if (args.length != 4) {
      println(
        s"""
           |${this.getClass.getSimpleName}
           |     val Array(provincePath, cityPath,skewImeiPath , cityOutputPath) = args
        """.stripMargin)
      sys.exit(-1)
    }
    val spark: SparkSession = SparkSession.builder()
        .appName(s"${this.getClass.getSimpleName}")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //读取需要过滤到的imei号
    import spark.implicits._
    val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()

    val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
    import org.apache.spark.sql.functions.sum
    //按月聚合city
    val cityDF: DataFrame = spark.read.parquet(cityPath)
    //过滤出倾斜数据,直接进行统计
    cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
        .filter("skew_imei is not null")
        .drop("skew_imei")
        .groupBy("imei", "city")
        .agg(sum("city_freq").alias("city_freq"))
        .repartition(10)
        .write
        .mode(SaveMode.Overwrite)
        .parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))


    spark.stop()
    sc.stop()
  }
}

程序运行DAG图如下:


程序各stage时间如下:


city_origin_sparksql.PNG

方案总结:在stage1和stage3中仍热存在executor运行时间过长问题(相对于整个stage的运行时间),数据倾斜的问题依然存在;整体运行时长为9min。
聚合之后数据条数为:58848条,imei数量为1110个;

B采用SparkSQL对imei进行添加随机前缀,将imei进行打散的方式

方案分析:采用sparksql的api,使用udf函数,对imei进行添加随机前缀,进行第一次聚合,再去除imei的随机前缀,再次进行聚合统计。添加随机前缀的范围最好和executors的数量匹配,或者是进行实际的运行状况进行stage
代码开发如下:

package cn.ted.dmp.bank.mining.geo_active

import cn.ted.dmp.util.SparkOutputpathCheck
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
  * Author:  LiYahui
  * Date:  Created in  2019/3/26 13:44
  * Description: TODO 按照月为单位统计地理位置出现的频次
  * Version: V1.0         
  */
object AggCityFreqMonthSkew {
  def main(args: Array[String]): Unit = {
    val Array(provincePath, cityPath, skewImeiPath, provinceOutputPath, cityOutputPath) = args
    if (args.length != 5) {
      println(
        s"""
           |${this.getClass.getSimpleName}
           |     val Array(provincePath, cityPath,skewImeiPath ,provinceOutputPath, cityOutputPath) = args
        """.stripMargin)
      sys.exit(-1)
    }
    val spark: SparkSession = SparkSession.builder()
        .appName(s"${this.getClass.getSimpleName}")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.parquet.compression.codec", "gzip")
        .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //读取需要过滤到的imei号
    import spark.implicits._
    val skewImeiDF: DataFrame = sc.textFile(skewImeiPath).toDF("skew_imei").distinct()

    val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
    import org.apache.spark.sql.functions.sum
    //按月聚合city
    val cityDF: DataFrame = spark.read.parquet(cityPath)
    spark.udf.register("addPre", addPrefix(_: String))
    spark.udf.register("removePre", removePrefix(_: String))

    //过滤出倾斜的数据集,进行第一次聚合
    val firstAggDF: DataFrame = cityDF.join(skewImeiDF, cityDF("imei") === skewImeiDF("skew_imei"), "left")
        .filter("skew_imei is not null")
        .drop("skew_imei")
        .selectExpr("addPre(imei) as imei_prefix", "city", "city_freq")
        .groupBy("imei_prefix", "city")
        .agg(sum("city_freq").alias("city_freq"))
    //进行第二次聚合
    firstAggDF.selectExpr("removePre(imei_prefix) as imei", "city", "city_freq")
        .groupBy("imei", "city")
        .agg(sum("city_freq").alias("city_freq"))
        .repartition(10)
        .write
        .mode(SaveMode.Overwrite)
        .parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))
     spark.stop()
    sc.stop()
  }

  /**
    * 添加30以内的随机数前缀
    *
    * @param key
    * @return
    */
  def addPrefix(key: String): String = {
    val i: Int = scala.util.Random.nextInt(30)
    i + "_" + key
  }

  /**
    * 去掉前缀
    *
    * @param key
    * @return
    */
  def removePrefix(key: String): String = {
    key.split("_")(1)
  }
}

程序运行stage图如下:



程序各stage运行时间如下:


city_skew_sparksql.PNG

从上述stage图中可以看出,各个stage运行时长均相对于A方案中时长减半,整体运行时长为5min,数据倾斜得到缓解;
方案评估:对于采用过滤出倾斜部分的数据集,需要对原始数据进行两次遍历(第一次计算非倾斜数据,第二次计算倾斜数据)。这种情况类似于“分而治之”的思想,对于解决或者是缓解数据倾斜还是有一定的效果的。
C.采用Spark RDD的对imei进行打散聚合

方案分析:需要先将倾斜的数据集进行过滤保存,因为如果直接对读取的parquet文件由df转成rdd,需要将所有的数据量进行转换,消耗性能。因为df转成rdd需要对数据进行序列化,极大的消耗了性能。所以先将倾斜的数据进行取出另存。
注意:这种方法一定要将倾斜的数据先取出另存为一个临时型文件夹,再使用spark rdd的方式进行处理,这样会减少df转换为rdd的时间和性能消耗;
代码如下:

spark.read.parquet("/user/liyahui_su/bbc/sms_phone_parquet_rt/bbc_mining/geo_freq_city_month_test")
        .rdd
        .map(line => {
          val imei: String = line.getAs[String]("imei")
          val city: String = line.getAs[String]("city")
          val city_freq: Long = line.getAs[Long]("city_freq")
          //生成30以内的随机数,并将数据封装成key,value型数据
          val i = scala.util.Random.nextInt(30)
          (i + "_" + imei + "_" + city, city_freq)
        }).reduceByKey(_ + _) //第一次聚合
        .map(line => {
      val arr: Array[String] = line._1.split("_")

      (arr(1) + "_" + arr(2), line._2)
    }).reduceByKey(_ + _) //第二次聚合
        .map(line => {
      val arr: Array[String] = line._1.split("_")
      (arr(0), arr(1), line._2)
    })
        .toDF("imei", "city", "city_freq")
        .repartition(10)
    .write.parquet(SparkOutputpathCheck.SecondaryVerificationPath(cityOutputPath, fs))

方案总结

对于聚合类数据倾斜,首先需要统计导致倾斜的key的频次和数量,最好是保存作为一个字典库;采用"分而治之"的思想,对原数据集进行非倾斜数据和倾斜数据进行划分。对于非倾斜数据,直接采用业务逻辑的处理方式;对于倾斜的数据,需要进行过滤,保存到临时文件夹,再对数据集进行B方案或者是C方案的处理。
方案准备:
第一步:需要找出导致业务数据倾斜的key,在找出业务数据倾斜的key的过程中也会发生倾斜的情况,这个时候不要采用sample取样的方式,既然是想解决倾斜,就完全的统计出倾斜的key。
第二步:最简单的就是将倾斜的数据集过滤掉计算,但是这个不能满足实际业务开发的需要;在ABC方案中,进行选择与B方案类似的解决方案,C方案次之,A方案是最后的选择。
总结:遇到数据倾斜时,不要害怕,要弄清楚造成数据倾斜的原因时什么,和发生数据倾斜的spark 原理是什么,按照发生数据倾斜的症状对症下药,不断的调试和尝试、总结,就能找到一条合适的数据倾斜解决之路,对自己的技术提升也会有一定的帮助。
结束语:
咖啡如果不苦,那么估计就没有人喜欢它了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容