spark+hive开窗函数练习:求用户每次会话的行为轨迹

数据:

1001 2020-09-10 10:21:21 home.html

1001 2020-09-10 10:28:10 good_list.html

1001 2020-09-10 10:35:05 good_detail.html

1001 2020-09-10 10:42:55 cart.html

1001 2020-09-10 11:35:21 home.html

1001 2020-09-10 11:36:10 cart.html

1001 2020-09-10 11:38:12 trade.html

1001 2020-09-10 11:38:55 payment.html

1002 2020-09-10 09:40:00 home.html

1002 2020-09-10 09:41:00 mine.html

1002 2020-09-10 09:42:00 favor.html

1003 2020-09-10 13:10:00 home.html

1003 2020-09-10 13:15:00 search.html

需求: 

求得用户每次会话的行为轨迹(同一用户,上一条与下一条在半小时内为一次会话)

要求结果如下:

A 1001 2020-09-10 10:21:21 home.html 1

A 1001 2020-09-10 10:28:10 good_list.html 2

A 1001 2020-09-10 10:35:05 good_detail.html 3

A 1001 2020-09-10 10:42:55 cart.html 4

B 1001 2020-09-10 11:35:21 home.html 1

B 1001 2020-09-10 11:36:10 cart.html 2

B 1001 2020-09-10 11:38:12 trade.html 3

B 1001 2020-09-10 11:38:55 payment.html 4

C 1002 2020-09-10 09:40:00 home.html 1

C 1002 2020-09-10 09:41:00 mine.html 2

C 1002 2020-09-10 09:42:00 favor.html 3

D 1003 2020-09-10 13:10:00 home.html 1

D 1003 2020-09-10 13:15:00 search.html 2

一、sql语句:

1、建表:

create table page_session(

        user_id string,

        page_time string,

        page string)

row format delimited

fields terminated by '\t';

2、按时间排序-升序,按user_id分组,使用开窗函数lag()获得上一条的时间

select

        user_id,

        page_time,

        page,

lag(page_time) over(partition by user_id order by page_time asc) before_time

from page_session;

结果如下:

user_id page_time page before_time

1001 2020-09-10 10:21:21 home.html      NULL

1001 2020-09-10 10:28:10 good_list.html  2020-09-10 10:21:21

1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10

1001 2020-09-10 10:42:55 cart.html      2020-09-10 10:35:05

1001 2020-09-10 11:35:21 home.html      2020-09-10 10:42:55

1001 2020-09-10 11:36:10 cart.html      2020-09-10 11:35:21

1001 2020-09-10 11:38:12 trade.html      2020-09-10 11:36:10

1001 2020-09-10 11:38:55 payment.html    2020-09-10 11:38:12

1002 2020-09-10 09:40:00 home.html      NULL

1002 2020-09-10 09:41:00 mine.html      2020-09-10 09:40:00

1002 2020-09-10 09:42:00 favor.html      2020-09-10 09:41:00

1003 2020-09-10 13:10:00 home.html      NULL

1003 2020-09-10 13:15:00 search.html    2020-09-10 13:10:00

3、给每个会话添加会话id

两个条件判断是否为新会话:

1、before_time是否为null 

2、判断上一次时间与下一次时间是否大于半小时(时间需要转成时间戳),如果大于30分说明是新的会话

如果是新会话,就使用concat()拼接--将user_id和时间戳拼接起来得到唯一的session_id

select

        user_id,

        page_time,

        page,

        before_time,

if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

from(

        select

                user_id,

                page_time,

                page,

        lag(page_time) over(partition by user_id order by page_time asc) before_time

        from page_session) tmp1;

结果如下:

user_id page_time page before_time session_id

1001 2020-09-10 10:21:21 home.html      NULL    1001-1599733281

1001 2020-09-10 10:28:10 good_list.html  2020-09-10 10:21:21 NULL

1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 NULL

1001 2020-09-10 10:42:55 cart.html      2020-09-10 10:35:05 NULL

1001 2020-09-10 11:35:21 home.html      2020-09-10 10:42:55 1001-1599737721

1001 2020-09-10 11:36:10 cart.html      2020-09-10 11:35:21 NULL

1001 2020-09-10 11:38:12 trade.html      2020-09-10 11:36:10 NULL

1001 2020-09-10 11:38:55 payment.html    2020-09-10 11:38:12 NULL

1002 2020-09-10 09:40:00 home.html      NULL    1002-1599730800

1002 2020-09-10 09:41:00 mine.html      2020-09-10 09:40:00 NULL

1002 2020-09-10 09:42:00 favor.html      2020-09-10 09:41:00 NULL

1003 2020-09-10 13:10:00 home.html      NULL    1003-1599743400

1003 2020-09-10 13:15:00 search.html    2020-09-10 13:10:00 NULL

4、给每个会话的其他数据都添加上对应的session_id

last_value(xx,true) --返回一组值中的最后一个值(该组通常是有序集合)。

将第二个参数设置为true是忽略null值,如果这组值中的最后一个值是null值,返回该集合中的最后一个非空值

默认为false,如果最后一个值为null,函数将返回 null。

如果所有值均为空值,则返回 null。

select

        user_id,

        page_time,

        page,

last_value(session_id,true) over(partition by user_id order by page_time asc) session_id

from(

        select

                user_id,

                page_time,

                page,

                before_time,

        if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

        concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

        from(

                select

                        user_id,

                        page_time,

                        page,

                lag(page_time) over(partition by user_id order by page_time asc) before_time

                from page_session) tmp1) tmp2;

结果如下:

user_id page_time page session_id

1001 2020-09-10 10:21:21    home.html      1001-1599733281

1001 2020-09-10 10:28:10    good_list.html  1001-1599733281

1001 2020-09-10 10:35:05    good_detail.html 1001-1599733281

1001 2020-09-10 10:42:55    cart.html      1001-1599733281

1001 2020-09-10 11:35:21    home.html      1001-1599737721

1001 2020-09-10 11:36:10    cart.html      1001-1599737721

1001 2020-09-10 11:38:12    trade.html      1001-1599737721

1001 2020-09-10 11:38:55    payment.html    1001-1599737721

1002 2020-09-10 09:40:00    home.html      1002-1599730800

1002 2020-09-10 09:41:00    mine.html      1002-1599730800

1002 2020-09-10 09:42:00    favor.html      1002-1599730800

1003 2020-09-10 13:10:00    home.html      1003-1599743400

1003 2020-09-10 13:15:00    search.html    1003-1599743400

5、求行为轨迹--

ROW_NUMBER()排序:会根据顺序计算

select

        user_id,

        page_time,

        page,

        session_id,

row_number()over(partition by session_id order by page_time asc) step

from(

                select

                user_id,

                page_time,

                page,

        last_value(session_id,true) over(partition by user_id order by page_time asc) session_id

        from(

                select

                        user_id,

                        page_time,

                        page,

                        before_time,

                if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,

                concat(user_id,'-',unix_timestamp(page_time)),null ) session_id

                from(

                         select

                                user_id,

                                page_time,

                                page,

                        lag(page_time) over(partition by user_id order by page_time asc) before_time

                        from page_session) tmp1) tmp2) tmp3;

结果如下:

user_id page_time page session_id step

1001 2020-09-10 10:21:21 home.html      1001-1599733281 1

1001 2020-09-10 10:28:10 good_list.html  1001-1599733281 2

1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 3

1001 2020-09-10 10:42:55 cart.html      1001-1599733281 4

1001 2020-09-10 11:35:21 home.html      1001-1599737721 1

1001 2020-09-10 11:36:10 cart.html      1001-1599737721 2

1001 2020-09-10 11:38:12 trade.html      1001-1599737721 3

1001 2020-09-10 11:38:55 payment.html    1001-1599737721 4

1002 2020-09-10 09:40:00 home.html      1002-1599730800 1

1002 2020-09-10 09:41:00 mine.html      1002-1599730800 2

1002 2020-09-10 09:42:00 favor.html      1002-1599730800 3

1003 2020-09-10 13:10:00 home.html      1003-1599743400 1

1003 2020-09-10 13:15:00 search.html    1003-1599743400 2

二、代码实现:

import java.text.SimpleDateFormat

import java.util.UUID

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Dataset

object SessionTest1 {

  def main(args: Array[String]): Unit = {

    //需求:求得每个用户每次会话的行为轨迹

    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()

    import spark.implicits._

    //1、读取数据(行类型转为元组)

    val ds: Dataset[(String, String, String)] = spark.read.option("sep","\t").csv("datas/session.txt")

      .toDF("user_id","page_time","page").as[(String,String,String)]

    //转为rdd操作

    val rdd: RDD[(String, String, String)] = ds.rdd

    //此时的结果

    //RDD(

    //  (1001,2020-09-10 10:21:21,home.html),

    //  (1001,2020-09-10 10:28:10,good_list.html),

    //  (1001,2020-09-10 10:35:05,good_detail.html),

    //  ...

    //  (1003,2020-09-10 13:15:00,search.html)

    // )

    //2、转换数据类型--转为样例类

    val rdd2: RDD[UserAnalysis] = rdd.map{

      case(userid,timestr,page)=>

        //格式化时间

        val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

        //获得时间戳-毫秒

        val time = formatter.parse(timestr).getTime

        UserAnalysis(userid,time,timestr,page)

    }

    //此时的结果:

    //RDD(

    //  UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

    //  UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

    //  UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

    //  ...

    //  UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))

    //3、按照用户id分组

    val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid)

    //此时的结果:

    //RDD(

    //  1001->Iterable(

    //          UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

    //          UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

    //          UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

    //          ...)

    //  ...

    //  1003->Iterable(

    //          ...

    //          UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)))

    val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{

      //4、对每个用户所有数据排序

      val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time)

      //结果为:

      //List(

      //    UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)

      //    UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)

      //    UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)

      //    ...)

      //5、针对每个用户所有数据滑窗--每2个为一个窗口

      val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2)

      //结果为:

      //Iterator(

      //    List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1),

      //          UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1))

      //    List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1),

      //          UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) )

      //    ....

      // )

      //6、判断每个窗口中两个数据是否属于同一个session,如果属于同一个会话,修改session,step

      slidingList.foreach(list=>{

        //取出窗口中的第一个数据

        val first = list.head

        //取出窗口中的第二个数据

        val next = list.last

        //判断第一个数据和下一个数据的时间是否在30分钟内

        if (next.time-first.time <= 30*60*1000){

          //属于同一个会话,那么下一条数据的session和上一条一样

          next.session=first.session

          //属于同一个会话,那么下一条数据的轨迹step+1

          next.step=first.step+1

        }

      })

      //返回样例类对象(样例类对象对应的session和step已经修改了)

      x._2

    })

    //7、结果展示

    rdd4.foreach(println)

    //结果:

    //UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1)

    //UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2)

    //UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3)

    //UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4)

    //UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1)

    //UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2)

    //UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3)

    //UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4)

    //UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1)

    //UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2)

    //UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3)

    //UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1)

    //UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2)

  }

}

//UUID(Universally Unique IDentifier)全局唯一标识符,是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的

//UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一个自动生成主键的方法,它生成的是以为32位的数字和字母组合的字符,中间还参杂着4个 - 符号。

case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容