2.4 Apache Flink Time 与 Window

1. Time

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:

图 Flink 时间概念
  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

  • Ingestion Time:是数据进入Flink的时间。

  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如,一条日志进入Flink的时间为2018-11-12 10:00:00.123,到达Window的系统时间为2018-11-12 10:00:01.234,日志的内容如下:
2018-11-02 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

2. Window

2.1 Window 概述

streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

2.2 Window 类型

Window可以分成两类:

  • CountWindow:按照指定的数据条数生成一个Window,与时间无关。
  • TimeWindow:按照时间生成Window。

对于TimeWindow 和 CountWindow,可以根据窗口实现原理的不同分成:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window),另外还有会话窗口(Session Window)。

(1) 滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

图 滚动窗口

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

(2)滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

图 滑动窗口

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

(3)会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

图 会话窗口

3. Window API

3.1 CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。

(1)滚动窗口(Tumbling Windows)

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对Stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)

// 引入滚动窗口
// 这里的5指的是5个相同key的元素计算一次
val streamWindow = streamKeyBy.countWindow(5)

// 执行聚合操作
val streamReduce = streamWindow.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合操作写入文件
streamReduce.print()

// 执行程序
env.execute("TumbingWindow")
(2)滑动窗口(Sliding Windows)

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是5个元素。

 // 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)
 
// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
 
// 引入滚动窗口
// 当相同key的元素个数达到2个时,触发窗口计算,计算的窗口范围为5
val streamWindow = streamKeyBy.countWindow(5,2)
 
// 执行聚合操作
val streamReduce = streamWindow.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
// 将聚合数据写入文件
streamReduce.print()
 
// 执行程序
env.execute("TumblingWindow")

3.2 TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算。

(1)滚动窗口(Tumbling Windows)

Flink默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行聚合操作
val streamReduce = streamWindow.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合数据写入文件
streamReduce.print()

// 执行程序
env.execute("TimeWindow")

时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。

(2) 滑动窗口(SlidingEventTimeWindows)

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。

下面代码中的sliding_size设置为了2s,也就是说,窗口每2s就计算一次,每一次计算的window范围是5s内的所有元素。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)
 
// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
 
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5), Time.seconds(2))
 
// 执行聚合操作
val streamReduce = streamWindow.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
// 将聚合数据写入文件
streamReduce.print()
 
// 执行程序
env.execute("TumblingWindow")

时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。

3.3 Window Reduce

WindowedStream → DataStream:给window赋一个reduce功能的函数,并返回一个聚合的结果。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)
 
// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
 
// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
 
// 执行聚合操作
val streamReduce = streamWindow.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
// 将聚合数据写入文件
streamReduce.print()
 
// 执行程序
env.execute("TumblingWindow")

3.4 Window Fold

WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)
 
// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
 
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
 
// 执行fold操作
val streamFold = streamWindow.fold(100){
  (begin, item) =>
     begin + item._2
}
 
// 将聚合数据写入文件
streamFold.print()
 
// 执行程序
env.execute("TumblingWindow")

3.5 Aggregation on Window

WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)
 
// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
 
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
 
// 执行聚合操作
val streamMax = streamWindow.max(1)
 
// 将聚合数据写入文件
streamMax.print()
 
// 执行程序
env.execute("TumblingWindow")
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350