1. Event Time and Watermarks
Flink 支持三种time模式:
event time: 事件发生的时间
ingestion time:flink接收到事件的时间
processing time:operator实际处理事件的时间
使用event time模式时,flink根据Timestamp Extractor 和 Watermark Generator 追踪事件时间。
-
Watermarks
watermark是stream中的一个时间戳元素,用来标记这个时间戳以前的事件都已经到达,flink不必继续等待. watermark generator用来生产watermark。产生watermark的策略可以是多种多样的,flink认为事件的延迟会有一个最大值,也叫做有界乱序策略,所有的事件都会在这个最大延迟之内到达。watermark的时间设定是综合对延迟和数据完整性要求的一个权衡,在watermark之后到达的事件称为是迟到的事件。
DataStream<Event> stream = ...
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);
2. Windows
Flink的时间窗口分析依赖于两个抽象:Window Assigners创建一个新的window对象,并向window内分配event;Window Functions应用于window内的event进行分析。Triggers决定什么时候开始调用
window function, Evictors可以移除window内的元素。
stream.
.keyBy(<key selector>)
.window(<window assigner>)
.reduce|aggregate|process(<window function>);
# 不按key分组的stream
stream.
.windowAll(<window assigner>)
.reduce|aggregate|process(<window function>);
-
Window Assigners
Flink内置了多种window assigners类型:
TumblingEventTimeWindows.of(Time.minutes(1))
SlidingEventTimeWindows.of(Time.minutes(1),Time.seconds(10))
EventTimeSessionWindows.withGap(Time.minutes(30))
基于time的window assigners可以用event time也可以用processing time设定window。如果使用processing time会有很低的延迟, 但不会很好地处理历史数据,乱序的数据,结果也可能是不稳定的。
基于count的window,只有batch完整以后才会触发一个window, 而无法对一个不完整的window做处理。
global window把一个key的所有event放在一个window里,就需要开发自定义Trigger,这时候可以选择ProcessFunction.
-
Window Functions
如何处理window内的对象有三种方式:
批处理方式: 使用ProcessWindowFunction会有一个window内元素的Iterable作为入参。在window触发之前到达的event都要缓存在Flink的state中。
DataStream<SensorReading> input = ...
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());
public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // input type
Tuple3<String, Long, Integer>, // output type
String, // key type
TimeWindow> { // window type
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {
int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
process的第二个入参context中包含window的信息。
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}
windowState, globalState可以存储per-key, per-window 或global per-key即所有window中这个key的数据,如果你要在下一个window使用上一个window的计算结果时会有用。
增量方式:window内每到达一个元素就调用一次ReduceFunction或AggregateFunction
二者结合:window触发的时候向ProcessWindowFunction传递一个预先使用ReduceFunction或AggregateFunction聚合过的结果
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());
private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}
private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {
SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
在这里Iterable<SensorReading>里只有一个元素,就是由MyReducingMax计算出的最大值。
-
迟到事件
使用event time window时,迟到事件默认丢弃,但你也可以自定义处理方式。
- 指定一个late events的输出流
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Event> result = stream.
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event> lateStream = result.getSideOutput(lateTag);
- 指定一个可接受的延迟时间,迟到数据仍旧可以被放入window中计算
stream.
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
- 二者结合,allowedLateness内到达的事件进入window, 之后的进入output stream
- 一些注意点
- 滑动窗口模式下,一个element可能会出现在多个window中
- time window 小时模式下默认只在整点时触发,如你的应用在12:05启动,第一个window将只有55m, 而在1:00关闭
- 一个window输出的event的时间戳是该window的结束时间
- 只有window内有event到达时才会触发,如果一个window是空的,则不会触发任何计算,也不会有结果输出
- session window是基于窗口合并实现的,开始时每一个element都在一个新的window内,如果两个window的间隔小于session设定的时间,会被合并。所以late event会导致迟到的合并。
