[Hive] 两个‘不常用’的函数posexplode和lag

昨天看到了大神 文哥的学习日记 的最新一章博客,分享的是hive相关的知识。正好最近自己也在复盘hive,所以特地学习并实践了一下博客的内容。
文哥分享的是关于hive sql的四道面试题,其实主要是围绕hive sql的两个函数:posexplodelag/lead
说实话,我之前并不知道这两个函数,更别说在实际工作中应用了,相关的也就是用过explode函数;不过在看了这两个函数的使用场景后,发现还是很值得一学的。

1.posexplode

在说明posexplode之前,先了解下 Lateral View的用法:
Lateral View与用户自定义生成函数即UDTF(如explode()或者split()等)结合使用。
(UDTF:为每一个输入行生成0个或者多个输出行)
Lateral View将UDTF应用于基础表的每一行,然后将输出行连接到输入行,以形成具有所提供的表别名的虚拟表。
基本用法:
lateral view:LATERAL VIEW udtf(expression) tableAlias AS columnAlias

案例数据准备:

 val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", StringType, true),
      StructField("time", StringType, true)
    ))
    //创建案例数据
    val dataRdd = spark.sparkContext.parallelize(Array("a,b,c,d;2:00,3:00,4:00,5:00", "f,b,c,d;1:10,2:20,3:30,4:40")).map(line => line.split(";"))
    val rowRdd = dataRdd.map(p => Row(p(0).trim, p(1).trim))

    //创建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")
    data.show(false)

结果显示:
+-------+-------------------+
|id |time |
+-------+-------------------+
|a,b,c,d|2:00,3:00,4:00,5:00|
|f,b,c,d|1:10,2:20,3:30,4:40|
+-------+-------------------+

最后要展示的数据样式为:


image.png

1.1 演示下explode用法

   //不使用 Lateral View
   val sql1 =
    s"""
       |SELECT
       |  id,
       |  time,
       |  explode(split(time,',')) as single_time
       |FROM tempTable
       """.stripMargin

    hiveContext.sql(sql1).show(false)
  
  //2.使用explode和lateral view,效果与1一样
    val sql2 =
      s"""
         |SELECT
         |  id,
         |  time,
         |  single_time
         |FROM tempTable
         |lateral view explode(split(time,',')) as single_time
       """.stripMargin

    hiveContext.sql(sql2).show(false)

结果显示:
+-------+-------------------+-----------+
|id |time |single_time|
+-------+-------------------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|4:40 |
+-------+-------------------+-----------+

将id列也explode之后,结果显示为:

+-------+-------------------+---------+-----------+
|id |time |single_id|single_time|
+-------+-------------------+---------+-----------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|a |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |5:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |2:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |4:00 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |5:00 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |1:10 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |2:20 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |3:30 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |4:40 |
+-------+-------------------+---------+-----------+

1.2 使用posexplode函数
posexplode相比在explode之上,将一列数据转为多行之后,还会输出数据的下标。
示例:

    val sql4 =
      s"""
         |SELECT
         |  id,
         |  time,
         |  single_id,
         |  single_id_index
         |FROM tempTable
         |lateral view posexplode(split(id,',')) t as single_id_index,single_id
     """.stripMargin

    hiveContext.sql(sql4).show(false)

会发现多了1列 single_id_index
结果显示:
+-------+-------------------+---------+---------------+
|id |time |single_id|single_id_index|
+-------+-------------------+---------+---------------+
|a,b,c,d|2:00,3:00,4:00,5:00|a |0 |
|a,b,c,d|2:00,3:00,4:00,5:00|b |1 |
|a,b,c,d|2:00,3:00,4:00,5:00|c |2 |
|a,b,c,d|2:00,3:00,4:00,5:00|d |3 |
|f,b,c,d|1:10,2:20,3:30,4:40|f |0 |
|f,b,c,d|1:10,2:20,3:30,4:40|b |1 |
|f,b,c,d|1:10,2:20,3:30,4:40|c |2 |
|f,b,c,d|1:10,2:20,3:30,4:40|d |3 |
+-------+-------------------+---------+---------------+

1.3 在此基础上,可以实现最终效果,只要选取id的下标与time的下标一致的记录

   val sql5 =
      s"""
         |SELECT
         |  single_id,
         |  single_time
         |FROM tempTable
         | lateral view posexplode(split(id,',')) as single_id_index,single_id
         | lateral view posexplode(split(time,',')) as single_time_index,single_time
         |WHERE
         |  single_id_index=single_time_index
       """.stripMargin

    hiveContext.sql(sql5).show(false)

结果显示:
+---------+-----------+
|single_id|single_time|
+---------+-----------+
|a |2:00 |
|b |3:00 |
|c |4:00 |
|d |5:00 |
|f |1:10 |
|b |2:20 |
|c |3:30 |
|d |4:40 |
+---------+-----------+

2.posexplode的应用2

应用场景:
对于记录1:00001,输出1对应的下标5
对于记录2:0101,输出1对应的下标2,4
完整代码显示:

object PosexplodeDemo2 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", IntegerType, true),
      StructField("value", StringType, true)
    ))
    //创建案例数据
    val dataRdd = spark.sparkContext.parallelize(Array("1,1011", "2,0101","3,1111","4,00001")).map(line => line.split(","))
    val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).trim))

    //创建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")
    data.show(false)

    /**
      * +---+-----+------------+------------------+
      * |id |value|single_value|single_value_index|
      * +---+-----+------------+------------------+
      * |1  |1011 |1           |0                 |
      * |1  |1011 |1           |2                 |
      * |1  |1011 |1           |3                 |
      * |2  |0101 |1           |1                 |
      * |2  |0101 |1           |3                 |
      * |3  |1111 |1           |0                 |
      * |3  |1111 |1           |1                 |
      * |3  |1111 |1           |2                 |
      * |3  |1111 |1           |3                 |
      * |4  |00001|1           |4                 |
      * +---+-----+------------+------------------+
      */
    val sql=
      s"""
         |SELECT
         |  id,
         |  value,
         |  single_value,
         |  single_value_index
         |FROM tempTable
         | lateral view posexplode(split(value,'')) as single_value_index,single_value
         |WHERE single_value='1'
       """.stripMargin
    hiveContext.sql(sql).show(false)


    /**
      * +---+-----+-------+
      * |id |value|indices|
      * +---+-----+-------+
      * |4  |00001|5      |
      * |2  |0101 |2,4    |
      * |1  |1011 |1,3,4  |
      * |3  |1111 |1,2,3,4|
      * +---+-----+-------+
      */
    val sql1=
      s"""
         |SELECT
         |  id,
         |  value,
         |  concat_ws(',',collect_list(single_value_index)) as indices
         |FROM
         |(
         |  SELECT
         |    id,
         |    value,
         |    single_value,
         |    cast(single_value_index+1 as string) as single_value_index
         |  FROM tempTable
         |   lateral view posexplode(split(value,'')) as single_value_index,single_value
         |  WHERE single_value='1'
         |)
         |GROUP BY id,value
       """.stripMargin

    hiveContext.sql(sql1).show(false)

  }
}

3.lag和lead函数

lag和lead是在实现分组排序的基础上,能够获取到排序在当前记录前几位或后几位的记录的某个字段值。
基础语法:
lag(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)
lead(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)

lag括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号小N的对应记录的指定字段的值
如果字段名为ts,N为1,就是取分组排序之后上一条记录的ts值

lead括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号大N的对应记录的指定字段的值
如果字段名为ts,N为1,就是取分组排序之后下一条记录的ts值

如果没有前一行或者后一行,对应的字段值为null

应用场景:统计截至目前季度的平均值(按照time排序,并计算平均值)
* +----+----+-----+
* |id |time|score|
* +----+----+-----+
* |2014|A |3 |
* |2014|C |1 |
* |2014|B |2 |
* |2015|A |4 |
* |2015|C |3 |
* +----+----+-----+

完整代码显示:

 /**
      * +----+----+-----+---------+
      * |id  |time|score|pre_score|
      * +----+----+-----+---------+
      * |2014|A   |3    |null     |
      * |2014|B   |2    |3        |
      * |2014|C   |1    |2        |
      * |2015|A   |4    |null     |
      * |2015|C   |3    |4        |
      * +----+----+-----+---------+
      */
    val sql=
      s"""
         |SELECT
         |  id,
         |  time,
         |  score,
         |  lag(score,1) over(partition by id order by time asc) as pre_score
         |FROM tempTable
       """.stripMargin
    hiveContext.sql(sql).show(false)


    /**
      * +----+----+---------+
      * |id  |time|avg_score|
      * +----+----+---------+
      * |2014|A   |3.0      |
      * |2014|B   |2.5      |
      * |2014|C   |1.5      |
      * |2015|A   |4.0      |
      * |2015|C   |3.5      |
      * +----+----+---------+
      */
    val sql1=
      s"""
         |SELECT
         |  id,
         |  time,
         |  CASE WHEN pre_score IS NULL THEN score
         |  ELSE (score+pre_score)/2
         |  END AS avg_score
         |FROM
         |(
         |   SELECT
         |     id,
         |     time,
         |     score,
         |     lag(score,1) over(partition by id order by time asc) as pre_score
         |   FROM tempTable
         |) tmp
       """.stripMargin

    hiveContext.sql(sql1).show(false)

4.posexplode和lag函数的结合(实现分块排序)

image.png

完整代码显示:

object Posexplode_LagDemo {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("hive context")
      .getOrCreate()

    val hiveContext = new HiveContext(spark.sparkContext)

    //指定schema
    val schema = types.StructType(Seq(
      StructField("id", IntegerType, true),
      StructField("value", IntegerType, true)
    ))
    //创建案例数据
    val dataRdd = spark.sparkContext.parallelize(Array("2014,1", "2015,1","2017,0","2018,0","2019,1","2020,1","2021,1","2022,0","2023,0")).map(line => line.split(","))
    val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).toInt))

    //创建DataFrame
    val data = hiveContext.createDataFrame(rowRdd, schema)
    data.registerTempTable("tempTable")

    /**
      * +----+-----+
      * |id  |value|
      * +----+-----+
      * |2014|1    |
      * |2015|1    |
      * |2017|0    |
      * |2018|0    |
      * |2019|1    |
      * |2020|1    |
      * |2021|1    |
      * |2022|0    |
      * |2023|0    |
      * +----+-----+
      */
    data.show(false)


    /**
      * +----+-----+---------+
      * |id  |value|pre_value|
      * +----+-----+---------+
      * |2014|1    |null     |
      * |2015|1    |1        |
      * |2017|0    |1        |
      * |2018|0    |0        |
      * |2019|1    |0        |
      * |2020|1    |1        |
      * |2021|1    |1        |
      * |2022|0    |1        |
      * |2023|0    |0        |
      * +----+-----+---------+
      * ------>
      * +------------+-----+
      * |min_block_id|value|
      * +------------+-----+
      * |2014        |1    |
      * |2017        |0    |
      * |2019        |1    |
      * |2022        |0    |
      * +------------+-----+
      * ------>
      * 基础表与上面的表join
      * +----+-----+------------+----+
      * |id  |value|min_block_id|rank|
      * +----+-----+------------+----+
      * |2014|1    |2014        |1   |
      * |2015|1    |2014        |1   |
      * |2017|0    |2017        |1   |
      * |2018|0    |2017        |1   |
      * |2019|1    |2019        |1   |
      * |2019|1    |2014        |2   |
      * |2020|1    |2019        |1   |
      * |2020|1    |2014        |2   |
      * |2021|1    |2019        |1   |
      * |2021|1    |2014        |2   |
      * |2022|0    |2022        |1   |
      * |2022|0    |2017        |2   |
      * |2023|0    |2022        |1   |
      * |2023|0    |2017        |2   |
      * +----+-----+------------+----+
      *
      * ------>
      * 限制rank=1
      * +----+-----+------------+----+
      * |id  |value|min_block_id|rank|
      * +----+-----+------------+----+
      * |2014|1    |2014        |1   |
      * |2015|1    |2014        |1   |
      * |2017|0    |2017        |1   |
      * |2018|0    |2017        |1   |
      * |2019|1    |2019        |1   |
      * |2020|1    |2019        |1   |
      * |2021|1    |2019        |1   |
      * |2022|0    |2022        |1   |
      * |2023|0    |2022        |1   |
      * +----+-----+------------+----+
      *
      * ------>
      * 最终结果
      * +----+-----+--------+
      * |id  |value|new_rank|
      * +----+-----+--------+
      * |2014|1    |1       |
      * |2015|1    |2       |
      * |2017|0    |1       |
      * |2018|0    |2       |
      * |2019|1    |1       |
      * |2020|1    |2       |
      * |2021|1    |3       |
      * |2022|0    |1       |
      * |2023|0    |2       |
      * +----+-----+--------+
      *
      */
    val sql=
      s"""
         |SELECT id,
         |  value,
         |  row_number() over(partition by min_block_id order by id asc) as new_rank
         |FROM
         |(
         |  SELECT
         |    t.id,
         |    t.value,
         |    m.min_block_id,
         |    row_number() over(partition by t.id order by min_block_id desc) as rank
         |  FROM tempTable t
         |  INNER JOIN
         |  (
         |     SELECT id as min_block_id,
         |       value
         |     FROM
         |      (
         |       SELECT
         |         id,
         |         value,
         |         lag(value,1) over(partition by 1 order by id asc) as pre_value
         |       FROM tempTable
         |       )
         |     WHERE pre_value is null
         |      or value!=pre_value
         |  ) m
         |  ON t.value=m.value
         |  AND t.id>=m.min_block_id
         |)
         |WHERE rank=1
         |ORDER BY id
       """.stripMargin

    hiveContext.sql(sql).show(false)
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容