昨天看到了大神 文哥的学习日记 的最新一章博客,分享的是hive相关的知识。正好最近自己也在复盘hive,所以特地学习并实践了一下博客的内容。
文哥分享的是关于hive sql的四道面试题,其实主要是围绕hive sql的两个函数:posexplode 和 lag/lead,
说实话,我之前并不知道这两个函数,更别说在实际工作中应用了,相关的也就是用过explode函数;不过在看了这两个函数的使用场景后,发现还是很值得一学的。
1.posexplode
在说明posexplode之前,先了解下 Lateral View的用法:
Lateral View与用户自定义生成函数即UDTF(如explode()或者split()等)结合使用。
(UDTF:为每一个输入行生成0个或者多个输出行)
Lateral View将UDTF应用于基础表的每一行,然后将输出行连接到输入行,以形成具有所提供的表别名的虚拟表。
基本用法:
lateral view:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
案例数据准备:
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", StringType, true),
StructField("time", StringType, true)
))
//创建案例数据
val dataRdd = spark.sparkContext.parallelize(Array("a,b,c,d;2:00,3:00,4:00,5:00", "f,b,c,d;1:10,2:20,3:30,4:40")).map(line => line.split(";"))
val rowRdd = dataRdd.map(p => Row(p(0).trim, p(1).trim))
//创建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
data.show(false)
结果显示:
+-------+-------------------+
|id |time |
+-------+-------------------+
|a,b,c,d|2:00,3:00,4:00,5:00|
|f,b,c,d|1:10,2:20,3:30,4:40|
+-------+-------------------+
最后要展示的数据样式为:
1.1 演示下explode用法
//不使用 Lateral View
val sql1 =
s"""
|SELECT
| id,
| time,
| explode(split(time,',')) as single_time
|FROM tempTable
""".stripMargin
hiveContext.sql(sql1).show(false)
//2.使用explode和lateral view,效果与1一样
val sql2 =
s"""
|SELECT
| id,
| time,
| single_time
|FROM tempTable
|lateral view explode(split(time,',')) as single_time
""".stripMargin
hiveContext.sql(sql2).show(false)
结果显示:
+-------+-------------------+-----------+
|id |time |single_time|
+-------+-------------------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|4:40 |
+-------+-------------------+-----------+
将id列也explode之后,结果显示为:
+-------+-------------------+---------+-----------+
|id |time |single_id|single_time|
+-------+-------------------+---------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |4:40 |
+-------+-------------------+---------+-----------+
1.2 使用posexplode函数
posexplode相比在explode之上,将一列数据转为多行之后,还会输出数据的下标。
示例:
val sql4 =
s"""
|SELECT
| id,
| time,
| single_id,
| single_id_index
|FROM tempTable
|lateral view posexplode(split(id,',')) t as single_id_index,single_id
""".stripMargin
hiveContext.sql(sql4).show(false)
会发现多了1列 single_id_index
结果显示:
+-------+-------------------+---------+---------------+
|id |time |single_id|single_id_index|
+-------+-------------------+---------+---------------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |0 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |1 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |0 |
|f,b,c,d|1:10,2:20,3:30,4:40|b |1 |
|f,b,c,d|1:10,2:20,3:30,4:40|c |2 |
|f,b,c,d|1:10,2:20,3:30,4:40|d |3 |
+-------+-------------------+---------+---------------+
1.3 在此基础上,可以实现最终效果,只要选取id的下标与time的下标一致的记录
val sql5 =
s"""
|SELECT
| single_id,
| single_time
|FROM tempTable
| lateral view posexplode(split(id,',')) as single_id_index,single_id
| lateral view posexplode(split(time,',')) as single_time_index,single_time
|WHERE
| single_id_index=single_time_index
""".stripMargin
hiveContext.sql(sql5).show(false)
结果显示:
+---------+-----------+
|single_id|single_time|
+---------+-----------+
|a |2:00 |
|b |3:00 |
|c |4:00 |
|d |5:00 |
|f |1:10 |
|b |2:20 |
|c |3:30 |
|d |4:40 |
+---------+-----------+
2.posexplode的应用2
应用场景:
对于记录1:00001,输出1对应的下标5
对于记录2:0101,输出1对应的下标2,4
完整代码显示:
object PosexplodeDemo2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", IntegerType, true),
StructField("value", StringType, true)
))
//创建案例数据
val dataRdd = spark.sparkContext.parallelize(Array("1,1011", "2,0101","3,1111","4,00001")).map(line => line.split(","))
val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).trim))
//创建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
data.show(false)
/**
* +---+-----+------------+------------------+
* |id |value|single_value|single_value_index|
* +---+-----+------------+------------------+
* |1 |1011 |1 |0 |
* |1 |1011 |1 |2 |
* |1 |1011 |1 |3 |
* |2 |0101 |1 |1 |
* |2 |0101 |1 |3 |
* |3 |1111 |1 |0 |
* |3 |1111 |1 |1 |
* |3 |1111 |1 |2 |
* |3 |1111 |1 |3 |
* |4 |00001|1 |4 |
* +---+-----+------------+------------------+
*/
val sql=
s"""
|SELECT
| id,
| value,
| single_value,
| single_value_index
|FROM tempTable
| lateral view posexplode(split(value,'')) as single_value_index,single_value
|WHERE single_value='1'
""".stripMargin
hiveContext.sql(sql).show(false)
/**
* +---+-----+-------+
* |id |value|indices|
* +---+-----+-------+
* |4 |00001|5 |
* |2 |0101 |2,4 |
* |1 |1011 |1,3,4 |
* |3 |1111 |1,2,3,4|
* +---+-----+-------+
*/
val sql1=
s"""
|SELECT
| id,
| value,
| concat_ws(',',collect_list(single_value_index)) as indices
|FROM
|(
| SELECT
| id,
| value,
| single_value,
| cast(single_value_index+1 as string) as single_value_index
| FROM tempTable
| lateral view posexplode(split(value,'')) as single_value_index,single_value
| WHERE single_value='1'
|)
|GROUP BY id,value
""".stripMargin
hiveContext.sql(sql1).show(false)
}
}
3.lag和lead函数
lag和lead是在实现分组排序的基础上,能够获取到排序在当前记录前几位或后几位的记录的某个字段值。
基础语法:
lag(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)
lead(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)
lag括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号小N的对应记录的指定字段的值
如果字段名为ts,N为1,就是取分组排序之后上一条记录的ts值
lead括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号大N的对应记录的指定字段的值
如果字段名为ts,N为1,就是取分组排序之后下一条记录的ts值
如果没有前一行或者后一行,对应的字段值为null
应用场景:统计截至目前季度的平均值(按照time排序,并计算平均值)
* +----+----+-----+
* |id |time|score|
* +----+----+-----+
* |2014|A |3 |
* |2014|C |1 |
* |2014|B |2 |
* |2015|A |4 |
* |2015|C |3 |
* +----+----+-----+
完整代码显示:
/**
* +----+----+-----+---------+
* |id |time|score|pre_score|
* +----+----+-----+---------+
* |2014|A |3 |null |
* |2014|B |2 |3 |
* |2014|C |1 |2 |
* |2015|A |4 |null |
* |2015|C |3 |4 |
* +----+----+-----+---------+
*/
val sql=
s"""
|SELECT
| id,
| time,
| score,
| lag(score,1) over(partition by id order by time asc) as pre_score
|FROM tempTable
""".stripMargin
hiveContext.sql(sql).show(false)
/**
* +----+----+---------+
* |id |time|avg_score|
* +----+----+---------+
* |2014|A |3.0 |
* |2014|B |2.5 |
* |2014|C |1.5 |
* |2015|A |4.0 |
* |2015|C |3.5 |
* +----+----+---------+
*/
val sql1=
s"""
|SELECT
| id,
| time,
| CASE WHEN pre_score IS NULL THEN score
| ELSE (score+pre_score)/2
| END AS avg_score
|FROM
|(
| SELECT
| id,
| time,
| score,
| lag(score,1) over(partition by id order by time asc) as pre_score
| FROM tempTable
|) tmp
""".stripMargin
hiveContext.sql(sql1).show(false)
4.posexplode和lag函数的结合(实现分块排序)
完整代码显示:
object Posexplode_LagDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("hive context")
.getOrCreate()
val hiveContext = new HiveContext(spark.sparkContext)
//指定schema
val schema = types.StructType(Seq(
StructField("id", IntegerType, true),
StructField("value", IntegerType, true)
))
//创建案例数据
val dataRdd = spark.sparkContext.parallelize(Array("2014,1", "2015,1","2017,0","2018,0","2019,1","2020,1","2021,1","2022,0","2023,0")).map(line => line.split(","))
val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).toInt))
//创建DataFrame
val data = hiveContext.createDataFrame(rowRdd, schema)
data.registerTempTable("tempTable")
/**
* +----+-----+
* |id |value|
* +----+-----+
* |2014|1 |
* |2015|1 |
* |2017|0 |
* |2018|0 |
* |2019|1 |
* |2020|1 |
* |2021|1 |
* |2022|0 |
* |2023|0 |
* +----+-----+
*/
data.show(false)
/**
* +----+-----+---------+
* |id |value|pre_value|
* +----+-----+---------+
* |2014|1 |null |
* |2015|1 |1 |
* |2017|0 |1 |
* |2018|0 |0 |
* |2019|1 |0 |
* |2020|1 |1 |
* |2021|1 |1 |
* |2022|0 |1 |
* |2023|0 |0 |
* +----+-----+---------+
* ------>
* +------------+-----+
* |min_block_id|value|
* +------------+-----+
* |2014 |1 |
* |2017 |0 |
* |2019 |1 |
* |2022 |0 |
* +------------+-----+
* ------>
* 基础表与上面的表join
* +----+-----+------------+----+
* |id |value|min_block_id|rank|
* +----+-----+------------+----+
* |2014|1 |2014 |1 |
* |2015|1 |2014 |1 |
* |2017|0 |2017 |1 |
* |2018|0 |2017 |1 |
* |2019|1 |2019 |1 |
* |2019|1 |2014 |2 |
* |2020|1 |2019 |1 |
* |2020|1 |2014 |2 |
* |2021|1 |2019 |1 |
* |2021|1 |2014 |2 |
* |2022|0 |2022 |1 |
* |2022|0 |2017 |2 |
* |2023|0 |2022 |1 |
* |2023|0 |2017 |2 |
* +----+-----+------------+----+
*
* ------>
* 限制rank=1
* +----+-----+------------+----+
* |id |value|min_block_id|rank|
* +----+-----+------------+----+
* |2014|1 |2014 |1 |
* |2015|1 |2014 |1 |
* |2017|0 |2017 |1 |
* |2018|0 |2017 |1 |
* |2019|1 |2019 |1 |
* |2020|1 |2019 |1 |
* |2021|1 |2019 |1 |
* |2022|0 |2022 |1 |
* |2023|0 |2022 |1 |
* +----+-----+------------+----+
*
* ------>
* 最终结果
* +----+-----+--------+
* |id |value|new_rank|
* +----+-----+--------+
* |2014|1 |1 |
* |2015|1 |2 |
* |2017|0 |1 |
* |2018|0 |2 |
* |2019|1 |1 |
* |2020|1 |2 |
* |2021|1 |3 |
* |2022|0 |1 |
* |2023|0 |2 |
* +----+-----+--------+
*
*/
val sql=
s"""
|SELECT id,
| value,
| row_number() over(partition by min_block_id order by id asc) as new_rank
|FROM
|(
| SELECT
| t.id,
| t.value,
| m.min_block_id,
| row_number() over(partition by t.id order by min_block_id desc) as rank
| FROM tempTable t
| INNER JOIN
| (
| SELECT id as min_block_id,
| value
| FROM
| (
| SELECT
| id,
| value,
| lag(value,1) over(partition by 1 order by id asc) as pre_value
| FROM tempTable
| )
| WHERE pre_value is null
| or value!=pre_value
| ) m
| ON t.value=m.value
| AND t.id>=m.min_block_id
|)
|WHERE rank=1
|ORDER BY id
""".stripMargin
hiveContext.sql(sql).show(false)
}
}