Spark+Hbase 亿级流量分析实战( PV/UV )

作为一个亿级的流量分析统计系统怎么能没有 PV / UV 这两经典的超级玛丽亚指标呢,话说五百年前它俩可是鼻祖,咳咳...,不好意思没忍住,回归正文,大猪在上一篇已经介绍了 小巧高性能ETL程序设计与实现 了,到现在,我们的数据已经落地到 Hbase 上了,而且日志的时间也已经写到 Mysql 了,万事都已经具备了,接下来我们就要撸指标了,先从两个经典的指标开始撸。

我们先理一下整个程序的计算流程,请看大图:


  1. 开始计算是我们的 Driver 程序入口

  2. 开始计算之前检查监听 Redis 有没有收到程序退出通知,如果有程序结束,否则往下执行

  3. 首先去查询我们上篇文章的 ETL loghub 日志的进度的平均时间点

  4. Switch 处是判断 loghub 的时间距离我们上次计算的指标时间是否相差足够时间,一般定义为3分钟时间之后,因为 loghub 的时间会有少量的波动情况

  5. 不满足则睡眠 30秒,可以自己控制Sleep范围。

  6. 满足则计算 上次指标计算结束时间 ~ (loghub时间 - 3分钟日志波动)

  7. 计算完成更新指标结果并且更新指标计算时间,然后回到第 2 点。

先从 DriverMain 入口开始撸起

//监听redis退出消息
while (appRunning) {
      val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
      //日志offset
      val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3)
      //指标计算offset
      val indicatorTime =dbClient.query("indicator").toLocalDateTime
      //两个时间相差(分)
      val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes

      val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
      //相差足够时间则进行指标运行,否则睡眠
      if (betweenTimeMinutes >= 1) {
        app.run(spark, indicatorTime, loghubTime)
        //计算完成更新指标时间
        dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")
      } else {
        //让我们的老大哥睡会,别太累了
        TimeUnit.SECONDS.sleep(30)
      }
    }

从注释上看,整体思路还是比较清晰的。

接下来我们跟着往下看run里面的方法做了什么有意思的操作

conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
conf.set("TableInputFormat.SCAN_ROW_START", start)
conf.set("TableInputFormat.SCAN_ROW_START", end)
val logDS = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat2],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
      .map(tp2 => HbaseUtil.resultToMap(tp2._2))
      .map(map => {
        LogCase(
          //子case类,存放多种格式的时间
          dt = DT(
            map.get("time").toLocalDateTimeStr(),
            map.get("time").toLocalDate().toString
          ),
          `type` = map.get("type"),
          aid = map.get("aid"),
          uid = map.get("uid"),
          tid = map.get("tid"),
          ip = map.get("ip")
        )
      }).toDS()

    logDS.cache()
    logDS.createTempView("log")
    //各类指标
    new PV().run()
    new UV().run()

startend 就是上面传下来需要查询的日志时间范围

简要说明:就是把Hbase的时间范围数据转成SparkSQL中的一张log

UVPV 指标计算里面就可以使用这张 log 表了

我们看看这两个经典的指标里面到底有什么乾坤:

spark.sql(
      """
        |SELECT
        |    aid,
        |    dt.date,
        |    COUNT(1) as pv
        |FROM
        |    log
        |GROUP BY
        |    aid,
        |    dt.date
      """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })

哇然一看,大哥你这也写得太简单了吧

不就是一个普通的 PV 算法,再加上分区foreachPartition操作把更到的每一行聚合的结果数据upsert到我们的common_report指标表

group by后面跟上要聚合的维度,以上是想统计每篇文章每天的PV

从这个方法我们就能推算出common_report长什么样了,至少有time+aid这两个唯一索引字段,还有pv这个字段,默认值肯定是 0

百闻不如一见,看看表的 DDL 是不是这样子:

create table common_report
(
    id bigint auto_increment primary key,
    aid bigint not null,
    pv int default 0 null,
    uv int default 0 null,
    time date not null,
    constraint common_report_aid_time_uindex unique (aid, time)
);

果然一点都没错。

再看 dbClient.upsert 里面大概也能猜到是实现了mysql的upsert功能,大概的sql就会生成下面格式:

INSERT INTO common_report (time, aid, pv)
VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;

大猪 那 UV 是怎么实现咧?一个用户在今天来过第一次之后再来就不能重复计算了噢。

来啦小弟:这个简单简单,可以使用Redis去重嘛,但是我们使用的都是Hbase了,还使用它做啥子咧,具体我们看一下 UV 里面到底是如何实现的:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
    import spark.implicits._
    logDS
      .mapPartitions(partitionT => {
        val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
        val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
        partitionT
          .grouped(Consts.BATCH_MAPPARTITIONS)
          .flatMap { tList =>
            tList
              .zip(hbaseClient.incrments(tList.map(md5)))
              .map(tp2 => {
                val log = tp2._1
                log.copy(ext = EXT(tp2._2))
              })
          }
      }).createTempView("uvTable")

    spark.sql(
      """
        |SELECT
        |    aid,
        |    dt.date,
        |    COUNT(1) as uv
        |FROM
        |    uvTable
        |WHERE
        |    ext.render = 1
        |GROUP BY
        |    aid,
        |    dt.date
      """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })

spark.sql 这里跟PV一样嘛,就是多了一句条件ext.render = 1,但是上面那一大堆是啥子咧?

大猪 CACHE_TABLE 是什么来的,是Hbase一张中间表,用户存用户UV标记的,建表语句如下,因为维度都是按天,所以我们TTL设计3天就可以了,两天也可以。

create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy','KeyPrefixRegionSplitPolicy.prefix_length'=>'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']

那还有其它的呢?

莫慌莫慌,大猪这就慢慢解释道:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])

上面这句的意思就是就是把log表给取出来,当然也可以通过参数传递。

下面的mapPartitions挺有意思的:

partitionT
    .grouped(1000)
        .flatMap { tList =>
          tList
            .zip(hbaseClient.incrments(tList.map(md5)))
            .map(tp2 => {
              val log = tp2._1
              log.copy(ext = EXT(tp2._2))
            })
        }

实际 上面是处理每个分区的数据,也就是转换数据,我们每来一条数据就要去Hbase那incrment一次,返回来的结果就是 render ,用户今天来多少次就incrment 相应的次数。

那有 什么用?我直接从Hbase GET取出数据,再判断有没有,如果没有这个用户就是今天第一次来,再把这个用户PUT进Hbase打一个标记,so easy。

其实 当初我们也是这么做的,后面发现业务的东西还是放在SQL里面一起写比较好,容易维护,而且incrment好处多多,因为它是带事务的,可以多线程进行修改。

而且 你们也发现了GETPUT是两次请求操作,保证不了事务的,指标几千万的数据少了那么几条,你们都不知道我当初找它们有辛苦。

你们 有没有发现render = 1的时候是代表UV(刚好等于1的时候为什么是UV?这里大家要慢慢地品尝一下了,其实就是实现了GETPUT操作),如果render = 2的时候又可以代表今天来过两次以上的用户指标,随时扩展,就问你撸这样的代码结构爽不爽?

看看 incrments 方法实现了啥子

def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
    if (incs.isEmpty) {
      Seq[Long]()
    } else {
      require(incs.head.length == 32, "pk require 32 length")
      val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
      val results = new Array[Object](convertIncs.length)
      table.batch(convertIncs.asJava, results)
      results.array.indices.map(
        ind =>
          Bytes.toLong(
            results(ind)
              .asInstanceOf[Result]
              .getValue(
                Bytes.toBytes(family),
                Bytes.toBytes(incs(ind).takeRight(24))
              )
          )
      )
    }
  }

这个方法就是实现了 incrment 的批量处理,因为我们在线上生产环境的时候测试过,批量处理比单条处理性能高了上百倍,所以这也就是为什么要写在mapPartitions里面的原因了,因为只有在这个方法里面才有批量数据转换操作,foreachPartition是批量处理操作,foreach,与map是一条一条操作不能使用,我们在输出报表到Mysql的地方已经用到过了。

不知不觉已经写了那么长的文章了


关闭计算程序只需要给redis发一条stop消息就可以啦

RedisUtil().getResource.publish("computeListenerMessage", "stop")

不能再复制代码了,不能显得文章是靠代码撑起来的。

福利 完整项目源码

心明眼亮的你、从此刻开始。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352