窗口是流式计算中非常重要的一个概念, 很多常见的功能都是通过各种窗口实现的, 比如每5分钟统计一下刚去1小时的热度. Flink DataStream API 将窗口独立成 Operator. 每个窗口算子包含了以下几个部分:
Windows Assigner
指定窗口的类型, 定义如何将数据流分配到一个或者多个窗口
Windows Trigger
指定窗口触发的时机, 定义窗口满足什么样的条件触发计算
Evictor
用户数据剔除
Lateness
标记是否处理迟到的数据, 当迟到数据到达窗口中是否触发计算
Output Tag
标记输出标签, 然后再通过 getSideOutput 将窗口中的数据根据标签输出
Windows Function
定义窗口上的数据处理的逻辑, 例如对数据进行sum
2. Window Assigner
首先最需要了解的就是 windows Assigner了, 我们想要一个什么样的窗口划分, 主要就是通过他来实现的.
根据 flink 上游的数据集是否为 KeyedStream 类型 来做分别的处理. 如果使用了keyBy( ) 则对应使用window( ) 来处理, 否则可以使用 windowAll( )来使用
Flink 可以支持两种类型的窗口, 分别是基于时间的窗口和基于数量的窗口.基于时间的意思就是按照时间去划分窗口,同理,基于数量的也是根据窗口中的数量来做切分的. 对应的分别就是 timeWindow() 和 countWindow() 来使用, 下面的示例主要使用 timeWindow() 来演示.
对于不同的 Window Assigner, 还可以把窗口划分为4大类, 分别是 滚动窗口(Tumbling Windows) / 滑动窗口(Sliding Window) / 会话窗口(Session Window) 和 全局窗口(Global Window).
滚动窗口
DataStream API 提供基于 EventTime 和 ProcessingTime 的两种类型的 Tumbling window.对应的 Assigner 分别是 TumblingEventTimeWindow 和 ProcessingEventTimeWindow . 举例如下,完整代码见Github.
// 使用ProcessTime的滚动时间窗口, 长度为10s
stream.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(...)
// 使用ProcessTime的滚动时间窗口, 长度为10s
stream.keyBy(x ->x.f1).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(...)
使用 window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 的方法有点啰嗦, Flink 还提供了timeWindow( ) 的 API 来简化这一行代码.
// 直接使用 timeWindow API 便可实现滚动窗口的操作, 参数依旧是窗口的长度
// 窗口类型的时间由 time characteristic 确定, 如果指定为 event time,那么窗口也会自动用这个时间
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10));
滑动窗口
滑动窗口顾名思义就是一个在不断往后滑动的窗口, 比如说 每5分钟 统计一个 最近一小时的时间, 那么就需要用滑动窗口来做处理. 滑动窗口主要是依靠 window size 和 slide time 来确定. 与滚动窗口类似的, flink 也提供了对应不同时间的 Assigner API(SlidingEventTimeWindow / SlidingEventTimeWindow), 语法基本类似, 只是由原本的一个参数(窗口长度) 变为了两个参数(窗口长度和滑动时间), 同样的, 为了简化代码, 依然可以使用timeWindow() 来简化.
// 两个参数分别是 窗口长度 和 滑动时间, 窗口时间类型依旧通过time characteristic 确定
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
会话窗口
会话窗口主要是将某段时间内活跃度较高的数据聚合成一个窗口计算. 触发条件是 Session Gap. 在规定的时间内没有数据接入则认为这个窗口结束,然后触发窗口计算. Session Gap 除了固定间隔的方式, 也可以动态抽取.
// 创建 Session Window, 间隔为 3s
DataStream<Tuple3<String, Long, Integer>> aggregated = source
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(3L)))
.sum(2);
全局窗口
全局窗口将所有key的数据分配到单个窗口中计算结果.
// 创建 GlobalWindow
input.keyBy(1)
.window(GlobalWindows.create())
.sum(1);
上面就是构建不同的窗口的方法了, 下文会介绍在有了窗口之后怎样对窗口中的数据做处理