疑问
The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger as default trigger. This trigger simply fires once the watermark passes the end of a window.
EventTimeTrigger是Flink中默认的处理event time窗口时候的trigger。其中onEventTime
方法的官方解释为:
The onEventTime() method is called when a registered event-time timer fires.
这里所说的timer就是在onElement
方法里面注册的,ctx.registerEventTimeTimer(window.maxTimestamp());
,下面就看一下onEventTime
方法的具体实现,很简单就一句话:
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
刚看到这个方法的时候会有一个疑问,注册timer的时候就是用的window.maxTimestamp()
,触发timer的时候应该time == window.maxTimestamp()
永远返回true
啊,什么情况下会返回false
呢,也就是这个方法返回的是TriggerResult.CONTINUE
?难道还有其他地方注册了timer?
分析
通过调用栈,可以发现在WindowOperator#processElement
方法里面调用了registerCleanupTimer
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
这里面注册了一个清理的timer,这个时间是cleanupTime
返回的
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
至此就可以回答上面的问题了,如果allowedLateness
不是0,那么就会有一个timer的时间不等于window.maxTimestamp()
而是window.maxTimestamp() + allowedLateness
。
总结
触发onEventTime
的timer有两种,一种就是在onElement
注册的timer,用于触发计算,一定会返回TriggerResult.FIRE
;另一种是清理窗口的timer,如果配置了allowedLateness
大于零,那么返回就是TriggerResult.CONTINUE
。