flink - 实时 - UV统计 - 布隆过滤器实现

1.知识点

  • scala输入输出样例类
  • keyBy并行度为1计算UV的技巧

map(data => ("uv", data.userId))..keyBy(_._1)

  • keyBy并行度>1 计算UV的技巧

自定义MapFunction,随机自定义key+"uv"

Random.nextString(10) + "uv"

  • WindowedStream.trigger的使用
    trigger触发器,每来一条数据直接清空窗口,放到redis进行计算
  • trigger返回WindowedStream,继续调用process(ProcessWindowFunction)
  • WindowedStream.process()的使用
    windowStream调用接口
  • 布隆过滤器的实现

2.业务目标

滚动输出最近1小时内的PV

窗口:1小时

指标:点击量

3.流程心法

总流程:创建输入输出类--->执行环境--->transform转换--->各类窗口函数的调用

主Object:

1.创建执行环境,设置时间语义,并行度等

2.transform api map转换为输入样例类,并设置watermark

3.key 定义成常量"v",那么keyBy就分为同一组,如果并行则可以自定义mapFunction

4.实现trigger

5.实现processWindowFunction

4.模块详解

4.1 创建输入输出样例类

4.2 主object实现

4.2.1 创建执行环境并添加数据源

val env = StreamExecutionEnvironment.getExecutionEnvironment    
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    
  env.setParallelism(1)     
// 从文件中读取数据    
val resource = getClass.getResource("/UserBehavior.csv")    
val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

4.2.2 Datastream map转换为输入样例类

 // 转换成样例类类型并提取时间戳和watermark    
val dataStream: DataStream[UserBehavior] = inputStream      
  .map(data => {        
    val arr = data.split(",")        
    UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)      
  })      
  .assignAscendingTimestamps(_.timestamp * 1000L)

4.2.3 处理逻辑(1)----filter类型,timeWindow

val uvStream = dataStream      
    .filter(_.behavior == "pv")      
    .map( data => ("uv", data.userId) ) //如果要并行,并行自定义mymapper      
    .keyBy(_._1)      
    .timeWindow(Time.hours(1))  //滚动窗口      
    .trigger(new MyTrigger())  //trigger触发器,每来一条数据直接清空窗口,放到redis计算。      
    .process( new UvCountWithBloom() )

4.2.4 处理逻辑(2)----Trigger实现

class MyTrigger() extends Trigger[(String,Long),TimeWindow]{  
  override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {     
    TriggerResult.FIRE_AND_PURGE   
  }   
  //系统时间有进展时做什么操作  
  override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   

  //watermark改变做什么操作  
  override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   

  override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {   
  }
}

4.2.5 处理逻辑(2)----ProcessWindowFunction实现

1.定义redis中存储位图的key ,本例为窗口结束时间

2.定义一个redis hash表,保存统计之后的每个窗口结束时间的uv count.

表名:uvcount

KEY: 窗口结束时间

VALUE:uv count值

3. 对userid进行hash, 从位图中查看hash后的偏移量是否窜在,若存在则uvcount不操作。若不存在则uvcount+1,位图也相应更新

class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{  
  // 定义redis连接以及布隆过滤器  
  lazy val jedis = new Jedis("localhost", 6379)  
  lazy val bloomFilter = new Bloom(1<<29)    // 2的29次方,1左移29位。 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB   

  // 本来是收集齐所有数据、窗口触发计算的时候才会调用;现在每来一条数据都调用一次  
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {    
    // 先定义redis中存储位图的key    
    val storedBitMapKey = context.window.getEnd.toString      
    
    //另外将当前窗口的uv count值,作为状态保存到redis里,用一个叫做uvcount的hash表来保存(windowEnd,count)    
    val uvCountMap = "uvcount"    
    val currentKey = context.window.getEnd.toString    
    var count = 0L     

    // 从redis中取出当前窗口的uv count值    
    if(jedis.hget(uvCountMap, currentKey) != null)      
      count = jedis.hget(uvCountMap, currentKey).toLong     
    
    // 去重:判断当前userId的hash值对应的位图位置,是否为0    
    val userId = elements.last._2.toString    
    // 计算hash值,就对应着位图中的偏移量    
    val offset = bloomFilter.hash(userId, 61)    
    val isExist = jedis.getbit(storedBitMapKey, offset)     

    if(!isExist){      
      // 如果不存在,那么位图对应位置置1,并且将count值加1            
      jedis.setbit(storedBitMapKey, offset, true)      
      jedis.hset(uvCountMap, currentKey, (count + 1).toString)    
    }  
  }
}

4.2.6 处理逻辑(3)----布隆过滤器实现

也可以调用外部google等现成的布隆过滤器.

设计布隆过滤器的要点:
1.选好点的hash函数
2.不同userid经过hash到同一位上。不要那么稠密。
即1亿的user,我们给出2亿的位,出现碰撞的概率就特别小。
10B * 1亿,大概1GB, 用位来存,1bit * 1亿 大概10m,放redis放内存都是个很好的 选择。
即使我们扩大位防止碰撞,放6亿,也是68M,可以放到redis中。有可能出现hash碰撞

class Bloom(size: Long) extends Serializable{  
  private val cap = size    // 默认cap应该是2的整次幂   

  //hash函数 value即userid,seed随机数种子  
  def hash(value: String, seed: Int): Long = {    
    var result = 0    
    //遍历userid,对每一位进行随机数种子的处理    
    for( i <- 0 until value.length ){      
      result = result * seed + value.charAt(i)    
    }     
    // 返回hash值,要映射到cap范围内    
    (cap - 1) & result  
  }
}

4.3 完整代码






最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容