spark练习:求得用户每次会话的行为轨迹--解决数据倾斜

数据:

1001,2020-09-10 10:21:21,home.html

1001,2020-09-10 10:28:10,good_list.html

1002,2020-09-10 09:40:00,home.html

1001,2020-09-10 10:35:05,good_detail.html

1002,2020-09-10 09:42:00,favor.html

1001,2020-09-10 10:42:55,cart.html

1001,2020-09-10 10:43:55,11.html

1001,2020-09-10 10:44:55,22.html

1001,2020-09-10 10:45:55,33.html

1001,2020-09-10 10:46:55,44.html

1001,2020-09-10 10:47:55,55.html

1001,2020-09-10 10:48:55,66.html

1001,2020-09-10 10:49:55,77.html

1002,2020-09-10 09:41:00,mine.html

1001,2020-09-10 11:35:21,home.html

1001,2020-09-10 11:36:10,cart.html

1003,2020-09-10 13:10:00,home.html

1001,2020-09-10 11:38:12,trade.html

1001,2020-09-10 11:39:12,aa.html

1001,2020-09-10 11:40:12,bb.html

1001,2020-09-10 11:41:12,cc.html

1001,2020-09-10 11:42:12,dd.html

1001,2020-09-10 11:43:12,ee.html

1001,2020-09-10 11:44:12,ff.html

1001,2020-09-10 11:45:12,gg.html

1001,2020-09-10 11:46:12,hh.html

1001,2020-09-10 11:47:12,ll.html

1001,2020-09-10 11:38:55,payment.html

1003,2020-09-10 13:15:00,search.html

需求:求得用户每次会话的行为轨迹--解决数据倾斜

import java.text.SimpleDateFormat

import java.util

import java.util.UUID

import org.apache.spark.{Partition, RangePartitioner}

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.Dataset

object SessionTest2 {

  def main(args: Array[String]): Unit = {

    //需求:求得用户每次会话的行为轨迹--解决数据倾斜

    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder().master("local[4]").appName("test").getOrCreate()

    import spark.implicits._

    //1、读取数据

    val ds = spark.read.csv("datas/session2.txt").toDF("user_id", "page_time", "page").as[(String, String, String)]

    //获取一个集合累加器

    val acc = spark.sparkContext.collectionAccumulator[(String, UserAnalysis)]("acc")

    //2、转换数据类型--样例类(转成样例类方便修改值)

    val ds2: Dataset[(String, UserAnalysis)] = ds.map {

      case (userid, timestr, page) =>

        //获取时间戳

        val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

        val time = formatter.parse(timestr).getTime

        //获得user对象

        val user = UserAnalysis(userid, time, timestr, page)

        //返回kv键值对--k为user_id和time或timestr拼接起来的字符串,方便后续分组排序

        (s"${userid}_${time}", user)

    }

    //3、转成rdd

    val rdd = ds2.rdd

    //4、用RangePartitioner进行重分区--得到的结果在每个分区内是有序的,在分区间也是有序的

    val rdd2: RDD[(String, UserAnalysis)] = rdd.repartitionAndSortWithinPartitions(new RangePartitioner(5, rdd))

    //repartitionAndSortWithinPartitions():根据给定的分区器对RDD进行重新分区,并在每个结果分区中根据key进行排序。

    //          def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] ={...}

    //class RangePartitioner[K : Ordering : ClassTag, V](partitions: Int,rdd: RDD[_ <: Product2[K, V]],...)extends Partitioner {...}

    //RangePartitioner分区规则:首先对rdd采样出 分区数-1 个key,通过这些key确定 分区数个 边界,

    //        这些边界就是每个分区的边界,后续key会与每个分区的边界对比,如果在范围内,则数据放入该分区

    //5、对每个分区中每个用户的两两数据进行判断,看是否属于同一个会话

    val rdd3 = rdd2.mapPartitionsWithIndex((index, it) => {

      val list = it.toList

      //当前list中的数据:

      //List(

      //(1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)),

      //(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)),

      //(1001_1599705305000,UserAnalysis(1001,1599705305000,2020-09-10 10:35:05,good_detail.html,618b4ff1-63f7-499c-9757-d7091ad8aa47,1)),

      //(1001_1599705775000,UserAnalysis(1001,1599705775000,2020-09-10 10:42:55,cart.html,0571dccb-0904-49b9-ab31-2e0418b44c34,1)),

      //(1001_1599705835000,UserAnalysis(1001,1599705835000,2020-09-10 10:43:55,11.html,7a7e5d8f-a04e-44ec-8180-8f521cb4b3bd,1)),

      //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,e21f5d1f-5105-4ab4-a2b0-8068192d45d4,1)))

      //...

      // 滑窗

      val slidingList: Iterator[List[(String, UserAnalysis)]] = list.sliding(2)

      //slidingList中的数据:

      //Iterator(

      //    List( (1001_1599704481000,UserAnalysis(1001,1599704481000,2020-09-10 10:21:21,home.html,2837d947-9a35-44b6-a90d-69f1c03f25d3,1)) ,(1001_1599704890000,UserAnalysis(1001,1599704890000,2020-09-10 10:28:10,good_list.html,249c5312-4f34-48db-9ed8-0de6c2348e69,1)) )

      //  ...

      // )

      slidingList.foreach(x => {

        //取出窗口中的第一个数据中的对象

        val before = x.head._2

        //取出窗口中的第二个数据中的对象

        val next = x.last._2

        //判断--如果是同一个用户,并且时间在半小时内就属于同一个会话

        if (next.userid == before.userid && next.time - before.time <= 30 * 60 * 1000) {

          //修改session和step

          next.session = before.session

          next.step = before.step + 1

        }

      })

      //此时分区内有轨迹顺序正确,但是分区间的轨迹还有问题--通过累加器解决

      // 使用集合累加器记录每个分区的第一条数据和最后一条数据

      // 目的是用第一条数据和上一个分区的最后一条数据比较,判断是否为同一会话,如果是就修改成相同的session,并且这个分区的第一条数据的step+1

      val head = list.head //第一条数据

      val last = list.last //最后一条数据

      //将本分区第一条与最后一条数据放入累加器--通过action算子触发

      acc.add((s"${index}#head", head._2))

      acc.add((s"${index}#last", last._2))

      list.iterator

    })

    //后续rdd3可能会在多个job中使用,所以缓存一下

    rdd3.cache()

    rdd3.collect()

    //获取累加器结果--获取的结果是java的List,不能toMap,所以需要导入以下内容,将java的集合转成scala的集合,也可将scala的集合转成java的集合

    import scala.collection.JavaConverters._

    val userMap = acc.value.asScala.toMap

    //根据分区号遍历--0号分区不用处理,因为0号分区里的都是同一个会话

    for (i <- 1 until (userMap.size / 2)) {

      //获取前一个分区的最后一条数据

      val beforePartitonLast = userMap.get(s"${i - 1}#last").get

      //获取当前分区的第一条数据

      val currentPartitionHead = userMap.get(s"${i}#head").get

      //获取当前分区的最后一条数据

      val currentPartitionLast = userMap.get(s"${i}#last").get

      //判断当前分区第一条与前一个分区的最后一条数据是否是同一个session,如果是则同步修改session和step

      if (currentPartitionHead.userid == beforePartitonLast.userid && currentPartitionHead.time - beforePartitonLast.time <= 30 * 60 * 1000) {

        //注意同一个会话可能跨分区,所以要先判断当前分区的最后一条数据和当前分区的第一条数据是否为同一session,如果是则同步修改session和step

        if (currentPartitionLast.session == currentPartitionHead.session) {

          currentPartitionLast.session = currentPartitionHead.session

          currentPartitionLast.step = beforePartitonLast.step + currentPartitionLast.step

        }

        currentPartitionHead.session = beforePartitonLast.session

        currentPartitionHead.step = beforePartitonLast.step + 1

        //0号分区

        //...

        //(1001_1599705895000,UserAnalysis(1001,1599705895000,2020-09-10 10:44:55,22.html,2826f01e-1294-429b-90e4-877439fc345a,6))

        //1号分区

        //(1001_1599705955000,UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,5865170d-bb4c-439a-b0aa-f0a10d307bc7,1))

        //此处的session应该等于0号分区的最后一条数据的session,step=0号分区Last.step+1=7

        //...

        //(1001_1599708921000,UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,7c60b41c-0b06-4bfe-ad80-29fc75ce1d24,1))

        //2号分区

        //(1001_1599708970000,UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,1))

        //此处的session应该等于1号分区的最后一条数据的session,step=1号分区Last.step+1=2

        //...

        //(1001_1599709272000,UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,9076e207-4ad4-4aa2-ab3a-ef966d23f09a,6))

        //该分区的最后一条数据和第一条数据是同一个会话,所以此处的session也应该等于1号分区的最后一条数据的session,step=1号分区Last.step+当前分区的Last.step=7

        //3号分区

        //(1001_1599709332000,UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,8c793999-5d8c-469c-b6c7-689d592e9713,1))

        //此处的session应该等于2号分区的最后一条数据的session,step=1号分区Last.step+1=8

        //...

        //(1001_1599709632000,UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,8c793999-5d8c-469c-b6c7-689d592e9713,6))

        //该分区的最后一条数据和第一条数据是同一个会话,所以此处的session也应该等于2号分区的最后一条数据的session,step=2号分区Last.step+当前分区的Last.step=13

      }

    }

    //userMap的结果:

    //1号分区: (UserAnalysis(1001,1599708921000,2020-09-10 11:35:21,home.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,1))

    //1号分区: (UserAnalysis(1001,1599705955000,2020-09-10 10:45:55,33.html,81e0175d-7422-41e1-a20b-54e447bbb809,7))

    //...

    //2号分区:(UserAnalysis(1001,1599708970000,2020-09-10 11:36:10,cart.html,e48d4e68-dbd0-4f56-af9e-8ca55b1bfacc,2))

    //2号分区: (UserAnalysis(1001,1599709272000,2020-09-10 11:41:12,cc.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,7))

    //...

    //3号分区: (UserAnalysis(1001,1599709332000,2020-09-10 11:42:12,dd.html,978710f3-0a61-4a40-8e7c-61e7b0b1a2cb,8))

    //3号分区:(UserAnalysis(1001,1599709632000,2020-09-10 11:47:12,ll.html,225b0961-1603-4e31-a84a-454d0025e79c,13))

    //此时得到的结果中,每个分区内的第一条和最后一条的session和step已经修正,但分区内的其他数据的还未修正

    //根据修复过的每个分区的第一条和最后一条数据修复分区内所有数据--将userMap广播出去

    //广播变量

    val bc = spark.sparkContext.broadcast(userMap)

    val rdd4 = rdd3.mapPartitionsWithIndex((index, it) => {

      val list = it.toList

      //获取当前分区第一条数据

      val currentPartitionUser = list.head._2

      //从广播变量中取出修复过的当前分区的第一条数据

      val repairCurrentPartitionUser = bc.value.get(s"${index}#head").get

      //取出当前分区原来数据的session

      val oldSession = currentPartitionUser.session

      //广播变量中的数据是被修复过的,所以如果修复过的数据不等于当前分区原来的数据,就说明确实被修复过了

      if (repairCurrentPartitionUser.session != currentPartitionUser.session) {

        //过滤出还没有被修复的数据

        list.filter(x => x._2.session == oldSession).foreach(x => {

          x._2.session = repairCurrentPartitionUser.session

          x._2.step = repairCurrentPartitionUser.step + x._2.step - 1

        })

      }

      list.foreach(x => println(s"index = ${index} ${x._2}"))

      list.iterator

    })

    rdd4.collect()

  }

}

//定义样例类

case class UserAnalysis(userid: String, time: Long, timestr: String, page: String, var session: String = UUID.randomUUID().toString, var step: Int = 1)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容