SparkSQL之双重Group解决数据倾斜

本文介绍了如何使用自定义UDF来给key新增随机数前缀,并使用双重Group来解决数据倾斜。

主要内容:

  • 1.自定义UDF
  • 2.数据流程
  • 3.Spark程序

1.自定义UDF

RandomPrefixUDF.java

/**
 * 给字段添加随机前缀
 * random_prefix()
 *
 * @author Administrator
 */
public class RandomPrefixUDF implements UDF2<String, Integer, String> {

    private static final long serialVersionUID = 1L;

    @Override
    public String call(String val, Integer num) throws Exception {
        Random random = new Random();
        int randNum = random.nextInt(num);
        return randNum + "_" + val;
    }
}

给字段添加随机前缀

RemoveRandomPrefixUDF.java

/**
 * 去除随机前缀
 * @author Administrator
 *
 */
public class RemoveRandomPrefixUDF implements UDF1<String, String> {

    private static final long serialVersionUID = 1L;

    @Override
    public String call(String val) throws Exception {
        String[] valSplited = val.split("_");
        return valSplited[1];
    }
}

去除字段的随机前缀

2.数据流程

不使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

结果:

A 4
B 1

使用随机前缀的流程

A 1
A 1
A 1
A 1
B 1

--加随机前缀

0_A 1
0_A 1
1_A 1
1_A 1
0_B 1

--第一次GroupBy

0_A 2
1_A 2
0_B 1

--去掉随机前缀

A 2
A 2
B 1

--第二次GroupBy

A 4
B 1

3.Spark程序

/**
  * 通过StructType直接指定Schema,转换为DataFrame
  */
object TestUDF {
  def main(args: Array[String]): Unit = {
    val spark =
      SparkSession.builder()
        .appName("TestUDF")
        .master("local")
        .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType)
    spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType)

    val personRDD =
      sc.parallelize(List("A", "A", "A", "A", "B"), 1)
        .map(x => (x, 1))
        .map(x => Row(x._1, x._2.toInt))

    // 创建Schema
    val schema: StructType = StructType(Seq(
      StructField("product", StringType, false),
      StructField("click", IntegerType, false)
    ))

    val personDF = spark.createDataFrame(personRDD, schema)

    //SQL语法操作
    personDF.createOrReplaceTempView("t_product_click")

    // 加随机前缀
    val sql1 =
      s"""
         |select
         |  random_prefix(product, 2) product,
         |  click
         |from
         |  t_product_click
       """.stripMargin

    // 分组求和
    val sql2 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      random_prefix(product, 2) product,
         |      click
         |    from
         |      t_product_click
         |  ) t1
         |group by
         |  product
       """.stripMargin

    // 去掉随机前缀
    val sql3 =
      s"""
         |select
         |  remove_random_prefix(product) product,
         |  click
         |from
         |  (
         |    select
         |      product,
         |      sum(click) click
         |    from
         |      (
         |        select
         |          random_prefix(product, 2) product,
         |          click
         |        from
         |          t_product_click
         |      ) t1
         |    group by
         |      product
         |  ) t2
         |
       """.stripMargin

    // 分组求和
    val sql4 =
      s"""
         |select
         |  product,
         |  sum(click) click
         |from
         |  (
         |    select
         |      remove_random_prefix(product) product,
         |      click
         |    from
         |      (
         |        select
         |          product,
         |          sum(click) click
         |        from
         |          (
         |            select
         |              random_prefix(product, 2) product,
         |              click
         |            from
         |              t_product_click
         |          ) t1
         |        group by
         |          product
         |      ) t2
         |  ) t3
         |group by
         |  product
       """.stripMargin

    spark.sql(sql1).show()
    spark.sql(sql2).show()
    spark.sql(sql3).show()
    spark.sql(sql4).show()
    sc.stop()
  }
}

执行结果:

+-------+-----+
|product|click|
+-------+-----+
|    0_A|    1|
|    1_A|    1|
|    0_A|    1|
|    0_A|    1|
|    1_B|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|    1_A|    3|
|    1_B|    1|
|    0_A|    1|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      A|    1|
|      B|    1|
|      A|    3|
+-------+-----+

+-------+-----+
|product|click|
+-------+-----+
|      B|    1|
|      A|    4|
+-------+-----+

这里是对所有的Key都加入了随机前缀,其实也可以先对数据样本抽样,提前筛选出会发生数据倾斜的Key来给加随机前缀,当然随机前缀也可以自定义算法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342