疑问
onElement
方法的具体实现如下
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
下面是官方文档对于这个方法的解释
The onElement() method is called for each element that is added to a window.
也就是说每条数据加入这个窗口中都会调用一次这个方法,什么情况下这个方法会返回TriggerResult.FIRE
?
分析
关键问题在于WindowOperator#isWindowLate
方法
protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime()
&& (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
如果allowedLateness
没有设置默认为0,那么第二个判断条件相当于window.maxTimestamp()<=internalTimerService.currentWatermark()
如果这个条件为true
,那么上层WindowOperator#processElement
方法中
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
会走到这个判断里面,那么就不会调用到EventTimeTrigger#onElement
,反过来说,能调用到EventTimeTrigger#onElement
方法的情况,window.maxTimestamp() <= ctx.getCurrentWatermark()
就不会成立。
但是当设置allowedLateness
大于0的情况,数据迟到的条件变成了window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark()
,那么假设滚动窗口的size是30秒,设置allowedLateness为10秒这时候来了一条数据的时间戳为1573441910000,那么此时window.maxTimestamp()=1573441919999
,allowedLateness=10000
,internalTimerService.currentWatermark()=1573441924000
,不满足上面迟到的条件,进入EventTimeTrigger#onElement
,这时就满足了window.maxTimestamp() <= ctx.getCurrentWatermark()
,即返回值就是TriggerResult.FIRE
。
小结
上面的过程比较绕,简单的说,如果allowedLateness=0
那么进入EventTimeTrigger#onElement
后不可能返回TriggerResult.FIRE
,因为满足这个判断条件的数据在前面isWindowLate(window)
判断中已经过滤掉了。如果allowedLateness>0
那么满足迟到的数据进入EventTimeTrigger#onElement
后就会返回TriggerResult.FIRE
。有兴趣的读者可以运行demo进行测试。
总结
结合上篇文章《onEventTime方法分析》可以得到下面的结论
- 当没有设置allowedLateness的时候,即
allowedLateness=0
-
EventTimeTrigger#onElement
用来注册窗口触发的定时器 - 定时器触发之后回调
EventTimeTrigger#onEventTime
触发窗口的计算 - 定时器是在
InternalTimerServiceImpl#advanceWatermark
方法中触发的,关键在timer.getTimestamp() <= time
,这个time就是传进来的当前的watermark,也就是watermark的时间大于等于定时器的注册时间的时候就会调用triggerTarget.onEventTime
-
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
- 设置
allowedLateness>0
- 没有迟到的数据调用逻辑如上
- 窗口已经触发计算之后,在允许迟到时间范围内到来的数据,会在
EventTimeTrigger#onElement
中返回TriggerResult.FIRE
触发计算,每一条都会触发一次所在窗口的计算 - 迟到的数据不会在
EventTimeTrigger#onEventTime
触发计算,此时对于迟到的数据返回TriggerResult.CONTINUE