数据:
1001 2020-09-10 10:21:21 home.html
1001 2020-09-10 10:28:10 good_list.html
1001 2020-09-10 10:35:05 good_detail.html
1001 2020-09-10 10:42:55 cart.html
1001 2020-09-10 11:35:21 home.html
1001 2020-09-10 11:36:10 cart.html
1001 2020-09-10 11:38:12 trade.html
1001 2020-09-10 11:38:55 payment.html
1002 2020-09-10 09:40:00 home.html
1002 2020-09-10 09:41:00 mine.html
1002 2020-09-10 09:42:00 favor.html
1003 2020-09-10 13:10:00 home.html
1003 2020-09-10 13:15:00 search.html
需求:
求得用户每次会话的行为轨迹(同一用户,上一条与下一条在半小时内为一次会话)
要求结果如下:
A 1001 2020-09-10 10:21:21 home.html 1
A 1001 2020-09-10 10:28:10 good_list.html 2
A 1001 2020-09-10 10:35:05 good_detail.html 3
A 1001 2020-09-10 10:42:55 cart.html 4
B 1001 2020-09-10 11:35:21 home.html 1
B 1001 2020-09-10 11:36:10 cart.html 2
B 1001 2020-09-10 11:38:12 trade.html 3
B 1001 2020-09-10 11:38:55 payment.html 4
C 1002 2020-09-10 09:40:00 home.html 1
C 1002 2020-09-10 09:41:00 mine.html 2
C 1002 2020-09-10 09:42:00 favor.html 3
D 1003 2020-09-10 13:10:00 home.html 1
D 1003 2020-09-10 13:15:00 search.html 2
一、sql语句:
1、建表:
create table page_session(
user_id string,
page_time string,
page string)
row format delimited
fields terminated by '\t';
2、按时间排序-升序,按user_id分组,使用开窗函数lag()获得上一条的时间
select
user_id,
page_time,
page,
lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session;
结果如下:
user_id page_time page before_time
1001 2020-09-10 10:21:21 home.html NULL
1001 2020-09-10 10:28:10 good_list.html 2020-09-10 10:21:21
1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10
1001 2020-09-10 10:42:55 cart.html 2020-09-10 10:35:05
1001 2020-09-10 11:35:21 home.html 2020-09-10 10:42:55
1001 2020-09-10 11:36:10 cart.html 2020-09-10 11:35:21
1001 2020-09-10 11:38:12 trade.html 2020-09-10 11:36:10
1001 2020-09-10 11:38:55 payment.html 2020-09-10 11:38:12
1002 2020-09-10 09:40:00 home.html NULL
1002 2020-09-10 09:41:00 mine.html 2020-09-10 09:40:00
1002 2020-09-10 09:42:00 favor.html 2020-09-10 09:41:00
1003 2020-09-10 13:10:00 home.html NULL
1003 2020-09-10 13:15:00 search.html 2020-09-10 13:10:00
3、给每个会话添加会话id
两个条件判断是否为新会话:
1、before_time是否为null
2、判断上一次时间与下一次时间是否大于半小时(时间需要转成时间戳),如果大于30分说明是新的会话
如果是新会话,就使用concat()拼接--将user_id和时间戳拼接起来得到唯一的session_id
select
user_id,
page_time,
page,
before_time,
if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
from(
select
user_id,
page_time,
page,
lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session) tmp1;
结果如下:
user_id page_time page before_time session_id
1001 2020-09-10 10:21:21 home.html NULL 1001-1599733281
1001 2020-09-10 10:28:10 good_list.html 2020-09-10 10:21:21 NULL
1001 2020-09-10 10:35:05 good_detail.html 2020-09-10 10:28:10 NULL
1001 2020-09-10 10:42:55 cart.html 2020-09-10 10:35:05 NULL
1001 2020-09-10 11:35:21 home.html 2020-09-10 10:42:55 1001-1599737721
1001 2020-09-10 11:36:10 cart.html 2020-09-10 11:35:21 NULL
1001 2020-09-10 11:38:12 trade.html 2020-09-10 11:36:10 NULL
1001 2020-09-10 11:38:55 payment.html 2020-09-10 11:38:12 NULL
1002 2020-09-10 09:40:00 home.html NULL 1002-1599730800
1002 2020-09-10 09:41:00 mine.html 2020-09-10 09:40:00 NULL
1002 2020-09-10 09:42:00 favor.html 2020-09-10 09:41:00 NULL
1003 2020-09-10 13:10:00 home.html NULL 1003-1599743400
1003 2020-09-10 13:15:00 search.html 2020-09-10 13:10:00 NULL
4、给每个会话的其他数据都添加上对应的session_id
last_value(xx,true) --返回一组值中的最后一个值(该组通常是有序集合)。
将第二个参数设置为true是忽略null值,如果这组值中的最后一个值是null值,返回该集合中的最后一个非空值。
默认为false,如果最后一个值为null,函数将返回 null。
如果所有值均为空值,则返回 null。
select
user_id,
page_time,
page,
last_value(session_id,true) over(partition by user_id order by page_time asc) session_id
from(
select
user_id,
page_time,
page,
before_time,
if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
from(
select
user_id,
page_time,
page,
lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session) tmp1) tmp2;
结果如下:
user_id page_time page session_id
1001 2020-09-10 10:21:21 home.html 1001-1599733281
1001 2020-09-10 10:28:10 good_list.html 1001-1599733281
1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281
1001 2020-09-10 10:42:55 cart.html 1001-1599733281
1001 2020-09-10 11:35:21 home.html 1001-1599737721
1001 2020-09-10 11:36:10 cart.html 1001-1599737721
1001 2020-09-10 11:38:12 trade.html 1001-1599737721
1001 2020-09-10 11:38:55 payment.html 1001-1599737721
1002 2020-09-10 09:40:00 home.html 1002-1599730800
1002 2020-09-10 09:41:00 mine.html 1002-1599730800
1002 2020-09-10 09:42:00 favor.html 1002-1599730800
1003 2020-09-10 13:10:00 home.html 1003-1599743400
1003 2020-09-10 13:15:00 search.html 1003-1599743400
5、求行为轨迹--
ROW_NUMBER()排序:会根据顺序计算
select
user_id,
page_time,
page,
session_id,
row_number()over(partition by session_id order by page_time asc) step
from(
select
user_id,
page_time,
page,
last_value(session_id,true) over(partition by user_id order by page_time asc) session_id
from(
select
user_id,
page_time,
page,
before_time,
if( before_time is null or unix_timestamp(page_time) - unix_timestamp(before_time) > 30*60,
concat(user_id,'-',unix_timestamp(page_time)),null ) session_id
from(
select
user_id,
page_time,
page,
lag(page_time) over(partition by user_id order by page_time asc) before_time
from page_session) tmp1) tmp2) tmp3;
结果如下:
user_id page_time page session_id step
1001 2020-09-10 10:21:21 home.html 1001-1599733281 1
1001 2020-09-10 10:28:10 good_list.html 1001-1599733281 2
1001 2020-09-10 10:35:05 good_detail.html 1001-1599733281 3
1001 2020-09-10 10:42:55 cart.html 1001-1599733281 4
1001 2020-09-10 11:35:21 home.html 1001-1599737721 1
1001 2020-09-10 11:36:10 cart.html 1001-1599737721 2
1001 2020-09-10 11:38:12 trade.html 1001-1599737721 3
1001 2020-09-10 11:38:55 payment.html 1001-1599737721 4
1002 2020-09-10 09:40:00 home.html 1002-1599730800 1
1002 2020-09-10 09:41:00 mine.html 1002-1599730800 2
1002 2020-09-10 09:42:00 favor.html 1002-1599730800 3
1003 2020-09-10 13:10:00 home.html 1003-1599743400 1
1003 2020-09-10 13:15:00 search.html 1003-1599743400 2
二、代码实现:
import java.text.SimpleDateFormat
import java.util.UUID
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
object SessionTest1 {
def main(args: Array[String]): Unit = {
//需求:求得每个用户每次会话的行为轨迹
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()
import spark.implicits._
//1、读取数据(行类型转为元组)
val ds: Dataset[(String, String, String)] = spark.read.option("sep","\t").csv("datas/session.txt")
.toDF("user_id","page_time","page").as[(String,String,String)]
//转为rdd操作
val rdd: RDD[(String, String, String)] = ds.rdd
//此时的结果
//RDD(
// (1001,2020-09-10 10:21:21,home.html),
// (1001,2020-09-10 10:28:10,good_list.html),
// (1001,2020-09-10 10:35:05,good_detail.html),
// ...
// (1003,2020-09-10 13:15:00,search.html)
// )
//2、转换数据类型--转为样例类
val rdd2: RDD[UserAnalysis] = rdd.map{
case(userid,timestr,page)=>
//格式化时间
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//获得时间戳-毫秒
val time = formatter.parse(timestr).getTime
UserAnalysis(userid,time,timestr,page)
}
//此时的结果:
//RDD(
// UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
// UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
// UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
// ...
// UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1))
//3、按照用户id分组
val rdd3: RDD[(String, Iterable[UserAnalysis])] = rdd2.groupBy(x => x.userid)
//此时的结果:
//RDD(
// 1001->Iterable(
// UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
// UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
// UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
// ...)
// ...
// 1003->Iterable(
// ...
// UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,825d1cd1-a506-48da-92c7-253602395f3e,1)))
val rdd4: RDD[UserAnalysis] = rdd3.flatMap(x=>{
//4、对每个用户所有数据排序
val sortedList: List[UserAnalysis] = x._2.toList.sortBy(y=>y.time)
//结果为:
//List(
// UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1)
// UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1)
// UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,33bf0bce-7620-478d-bfea-0f48c2321ebe,1)
// ...)
//5、针对每个用户所有数据滑窗--每2个为一个窗口
val slidingList: Iterator[List[UserAnalysis]] = sortedList.sliding(2)
//结果为:
//Iterator(
// List( UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,cdd00058-48f6-4cd8-980d-4f6966d18324,1),
// UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,342b9112-af55-4f03-802e-e9ee4f178818,1))
// List( UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,05899882-50bb-4c21-8cbf-32cae82c27f1,1),
// UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,4dd73745-b572-4304-a9ff-3b64d7b95b67,1) )
// ....
// )
//6、判断每个窗口中两个数据是否属于同一个session,如果属于同一个会话,修改session,step
slidingList.foreach(list=>{
//取出窗口中的第一个数据
val first = list.head
//取出窗口中的第二个数据
val next = list.last
//判断第一个数据和下一个数据的时间是否在30分钟内
if (next.time-first.time <= 30*60*1000){
//属于同一个会话,那么下一条数据的session和上一条一样
next.session=first.session
//属于同一个会话,那么下一条数据的轨迹step+1
next.step=first.step+1
}
})
//返回样例类对象(样例类对象对应的session和step已经修改了)
x._2
})
//7、结果展示
rdd4.foreach(println)
//结果:
//UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,1)
//UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,2)
//UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,3)
//UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,d341fd8b-333d-43ae-8ea2-795a5b544b6a,4)
//UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7de3b33f-a476-407d-9dd6-a763130130d2,1)
//UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,7de3b33f-a476-407d-9dd6-a763130130d2,2)
//UserAnalysis(1001,1599709092000,2020-09-10 11:38:12,trade.html,7de3b33f-a476-407d-9dd6-a763130130d2,3)
//UserAnalysis(1001,1599709135000,2020-09-10 11:38:55,payment.html,7de3b33f-a476-407d-9dd6-a763130130d2,4)
//UserAnalysis(1002,1599702000000,2020-09-10 09:40:00,home.html,e84903df-73d5-4fee-83e5-a11c186a8e27,1)
//UserAnalysis(1002,1599702060000,2020-09-10 09:41:00,mine.html,e84903df-73d5-4fee-83e5-a11c186a8e27,2)
//UserAnalysis(1002,1599702120000,2020-09-10 09:42:00,favor.html,e84903df-73d5-4fee-83e5-a11c186a8e27,3)
//UserAnalysis(1003,1599714600000,2020-09-10 13:10:00,home.html,292ff465-60e8-483f-908a-3d03eebca0f9,1)
//UserAnalysis(1003,1599714900000,2020-09-10 13:15:00,search.html,292ff465-60e8-483f-908a-3d03eebca0f9,2)
}
}
//UUID(Universally Unique IDentifier)全局唯一标识符,是指在一台机器上生成的数字,它保证对在同一时空中的所有机器都是唯一的
//UUID.randomUUID().toString()是javaJDK(1.5以上的版本)提供的一个自动生成主键的方法,它生成的是以为32位的数字和字母组合的字符,中间还参杂着4个 - 符号。
case class UserAnalysis(userid:String,time:Long,timestr:String,page:String,var session:String = UUID.randomUUID().toString,var step:Int=1)