flink自定义窗口分配器 周、月

关于分配器介绍内容来自官网

窗口分配的概念

窗口分配程序(Window Assigners)定义如何将元素分配给窗口。
通过window(...) (for keyed streams)windowAll()for non-keyed streams)指定需要的WindowAssigner。

WindowAssigner负责将每个传入元素分配给一个或多个窗口。

Flink为最常见的用例提供了预定义的窗口分配程序如:tumbling windows, sliding windows, session windows and global windows.

同时还可以通过扩展WindowAssigner类来实现自定义窗口assigner

所有内置的窗口分配程序(除了global windows)都根据时间将元素分配给窗口,时间可以是处理时间,也可以是事件时间。

基于时间的窗口有一个开始时间戳(包括)和一个结束时间戳(不包括),它们一起描述窗口的大小。[starTimestamp,endTimestamp):左闭右开

Flink预定义的窗口分配程序

Tumbling Windows (滚动窗口)

翻转窗口分配程序将每个元素分配给指定窗口大小的窗口,滚动窗口有一个固定的大小并且元素之间不重叠。


Tumbling Windows.png
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

//偏移量的一个重要用例是将窗口调整到UTC-0以外的时区。例如,在中国,您必须指定时间偏移量(-8)。
//事件时间窗口偏移-8小时
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

Sliding Windows (滑动窗口)

滑动窗口分配程序将元素分配给固定长度的窗口。类似于滚动窗口分配程序,窗口的大小由窗口大小参数配置。
同时还有一个附加的窗口滑动距离参数控制滑动窗口启动的频率。

滑动距离与窗口大小的的不同会导致数据元素是否重叠、丢失、恰好一次

具体情况如下:

  1. slideSize>windowSize 丢失数据
  2. slideSize<windowSize 数据重叠
  3. slideSize=windowSize 恰好一次(此时等同TumblingWindows)
Sliding Windows.png
val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

Session Windows(会话窗口)

会话窗口分配程序根据活动的会话对元素进行分组。
与滚动、滑动窗口不同的是,会话窗口没有数据重叠,也没有固定的开始和结束时间。
当某个会话窗口在一段时间内没有接收到元素时,它将关闭窗口

Session Windows.png
val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

Global Windows(全局窗口)

全局窗口分配程序将具有相同键的所有元素分配给同一个全局窗口。

全局窗口模式仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有一个可以处理聚合元素的自然末端。(窗口没有结束,没有出发计算的条件,除非自定义触发器)


Global Windows.png
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

上面就是flink提供一些窗口分配程序,基本能满足大多数情况。

但是对于某些特殊情况flink提供的分配成程序没法满足要求,此时就需要根据需求自定义分配程序。

实现自定的分配程序需要实现org.apache.flink.streaming.api.windowing.assigners.WindowAssigner


自定义 WindowAssigner

如果我们定义按天、小时、分钟的滚动窗口都很容易实现。

但是如果我们要定义一周(周日开始或周一),一个月(1号开始)的滚动窗口,那么现有API基本没法实现或很难实现。

对此就需要我们实现一个自定义的窗口分配器。

import java.text.SimpleDateFormat
import java.util
import java.util.{Calendar, Collections, Date}

import com.meda.utils.DateHelper
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.triggers.{EventTimeTrigger, Trigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**
 * 实现根据周或月的窗口划分窗口
 * 比如按照周日的00:00:00到下一个周六23:59:59
 * 或者每个月第一天的00:00:00到最后一天的23:59:59
 * 实现参考了 TumblingEventTimeWindows
 *
 * @param tag 标签 month  or  week
 * @tparam T 需要划分窗口的数据类型(输入类型)
 */
class CustomWindowAssigner[T](tag: String) extends WindowAssigner[T, TimeWindow] {
  //窗口分配的主要方法,需要为每一个元素指定所属的分区
  override def assignWindows(element: T, timestamp: Long, context: WindowAssigner.WindowAssignerContext): util.Collection[TimeWindow] = {
    var offset: (Long, Long) = null
    tag match {
      case "month" => offset = getTimestampFromMon(timestamp)
      case "week" => offset = getTimestampFromWeek(timestamp)
    }
    //分配窗口
    Collections.singletonList(new TimeWindow(offset._1, offset._2))
  }

  //注意此处需要进行类型的转换,否则或编译出错,java版本好像没问题,但是java对于上面的offset处理有点难搞,所以放弃了
  override def getDefaultTrigger(env: StreamExecutionEnvironment): Trigger[T, TimeWindow] = EventTimeTrigger.create().asInstanceOf[Trigger[T, TimeWindow]]

  override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer

  //是否使用事件时间
  override def isEventTime: Boolean = true


  /**
   * 获取指定时间戳当月时间戳范围
   * eg:2020-03-12 11:35:13 (timestamp=1583984113960l)
   * 结果为:(1582992000000,1585670399999)=>(2020-03-01 00:00:00,2020-03-31 23:59:59)
   *
   * @param timestamp 时间戳
   * @return
   */
  def getTimestampFromMon(timestamp: Long): (Long, Long) = {
    val calendar = Calendar.getInstance()
    calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMM01000000").format(new Date(timestamp)), "yyyyMMddHHmmss"))
    val numsOfMon: Long = calendar.getActualMaximum(Calendar.DAY_OF_MONTH)
    calendar.set(Calendar.DAY_OF_MONTH, 1)
    val start: Long = calendar.getTimeInMillis
    val end: Long = start + numsOfMon * 24 * 60 * 60 * 1000 - 1
    (start, end)
  }

  /**
   * 获取指定时间戳本周时间范围(从周日开始)
   * eg:2020-03-14 23:59:59 (timestamp=1583895064000l)
   * 结果为:(1583596800000,1584201599999)=>(2020-03-08 00:00:00,2020-03-14 23:59:59)
   *
   * @param timestamp 时间戳
   * @return
   */
  def getTimestampFromWeek(timestamp: Long): (Long, Long) = {
    val calendar = Calendar.getInstance()
    calendar.setTime(DateHelper.getInstance().getDateFromStr(new SimpleDateFormat("yyyyMMdd000000").format(new Date(timestamp)), "yyyyMMddHHmmss"))
    //    calendar.setFirstDayOfWeek(Calendar.SUNDAY)//设置周日为首日  默认值,一般不用设置
    calendar.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY)
    val start: Long = calendar.getTimeInMillis
    (start, start + 7 * 24 * 60 * 60 * 1000l - 1)
  }
}

//输入数据
case class Top100Input(event_id: String, date_d: String, timeStamp: Long, uid: Long, weekTag: String, monthTag: String)
//调用
val dStream: DataStream[Top100Input] = ...

dStream
      .keyBy(_.weekTag)
      .window(new CustomWindowAssigner[Top100Input]("week"))

dStream
      .keyBy(_.monthTag)
      .window(new CustomWindowAssigner[Top100Input]("month"))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容