本文介绍了如何使用自定义UDF来给key新增随机数前缀,并使用双重Group来解决数据倾斜。
主要内容:
- 1.自定义UDF
- 2.数据流程
- 3.Spark程序
1.自定义UDF
RandomPrefixUDF.java
/**
* 给字段添加随机前缀
* random_prefix()
*
* @author Administrator
*/
public class RandomPrefixUDF implements UDF2<String, Integer, String> {
private static final long serialVersionUID = 1L;
@Override
public String call(String val, Integer num) throws Exception {
Random random = new Random();
int randNum = random.nextInt(num);
return randNum + "_" + val;
}
}
给字段添加随机前缀
RemoveRandomPrefixUDF.java
/**
* 去除随机前缀
* @author Administrator
*
*/
public class RemoveRandomPrefixUDF implements UDF1<String, String> {
private static final long serialVersionUID = 1L;
@Override
public String call(String val) throws Exception {
String[] valSplited = val.split("_");
return valSplited[1];
}
}
去除字段的随机前缀
2.数据流程
不使用随机前缀的流程
A 1
A 1
A 1
A 1
B 1
结果:
A 4
B 1
使用随机前缀的流程
A 1
A 1
A 1
A 1
B 1
--加随机前缀
0_A 1
0_A 1
1_A 1
1_A 1
0_B 1
--第一次GroupBy
0_A 2
1_A 2
0_B 1
--去掉随机前缀
A 2
A 2
B 1
--第二次GroupBy
A 4
B 1
3.Spark程序
/**
* 通过StructType直接指定Schema,转换为DataFrame
*/
object TestUDF {
def main(args: Array[String]): Unit = {
val spark =
SparkSession.builder()
.appName("TestUDF")
.master("local")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
spark.udf.register("random_prefix", new RandomPrefixUDF(), DataTypes.StringType)
spark.udf.register("remove_random_prefix", new RemoveRandomPrefixUDF(), DataTypes.StringType)
val personRDD =
sc.parallelize(List("A", "A", "A", "A", "B"), 1)
.map(x => (x, 1))
.map(x => Row(x._1, x._2.toInt))
// 创建Schema
val schema: StructType = StructType(Seq(
StructField("product", StringType, false),
StructField("click", IntegerType, false)
))
val personDF = spark.createDataFrame(personRDD, schema)
//SQL语法操作
personDF.createOrReplaceTempView("t_product_click")
// 加随机前缀
val sql1 =
s"""
|select
| random_prefix(product, 2) product,
| click
|from
| t_product_click
""".stripMargin
// 分组求和
val sql2 =
s"""
|select
| product,
| sum(click) click
|from
| (
| select
| random_prefix(product, 2) product,
| click
| from
| t_product_click
| ) t1
|group by
| product
""".stripMargin
// 去掉随机前缀
val sql3 =
s"""
|select
| remove_random_prefix(product) product,
| click
|from
| (
| select
| product,
| sum(click) click
| from
| (
| select
| random_prefix(product, 2) product,
| click
| from
| t_product_click
| ) t1
| group by
| product
| ) t2
|
""".stripMargin
// 分组求和
val sql4 =
s"""
|select
| product,
| sum(click) click
|from
| (
| select
| remove_random_prefix(product) product,
| click
| from
| (
| select
| product,
| sum(click) click
| from
| (
| select
| random_prefix(product, 2) product,
| click
| from
| t_product_click
| ) t1
| group by
| product
| ) t2
| ) t3
|group by
| product
""".stripMargin
spark.sql(sql1).show()
spark.sql(sql2).show()
spark.sql(sql3).show()
spark.sql(sql4).show()
sc.stop()
}
}
执行结果:
+-------+-----+
|product|click|
+-------+-----+
| 0_A| 1|
| 1_A| 1|
| 0_A| 1|
| 0_A| 1|
| 1_B| 1|
+-------+-----+
+-------+-----+
|product|click|
+-------+-----+
| 1_A| 3|
| 1_B| 1|
| 0_A| 1|
+-------+-----+
+-------+-----+
|product|click|
+-------+-----+
| A| 1|
| B| 1|
| A| 3|
+-------+-----+
+-------+-----+
|product|click|
+-------+-----+
| B| 1|
| A| 4|
+-------+-----+
这里是对所有的Key都加入了随机前缀,其实也可以先对数据样本抽样,提前筛选出会发生数据倾斜的Key来给加随机前缀,当然随机前缀也可以自定义算法。