上一篇分享中介绍了Flink完成数据统计的例子,在最后提到了自定义的统计触发器,这一篇分享主要介绍一下自定义的触发器如何来实现。
一、触发器的作用
触发器的作用就是我们在窗口中,什么时候来触发我们的聚合方法。主要涉及到的就是聚合计算(AggregateFunction)中的
OUT getResult(ACC var1);
这两个方法
比如我们想要在1个小时为单位的时间窗口里,达到每分钟来刷新数据的目的,那我们就必须每分钟都要触发一次getResult方法,来把数据发送到下一个处理节点(一般来说都是Sink-->保存数据的节点)
二、触发器的的实现
实现很简单,只需要继承Trigger<Object, W>类,实现它的方法即可
例如,我们需要一个带步长的触发器:
class ProcessTimeTrigger<W extends Window> extends Trigger<Object, W>
private final long interval;
private ProcessTimeTrigger(long interval) {
this.interval = interval;
}
方法调用时机如下:
onElement()方法,每个元素被添加到窗口时调用
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
*最后一个clear()方法执行任何需要清除的相应窗口
上面的方法中有两个需要注意的地方:
1)第一、三通过返回一个TriggerResult来决定如何操作调用他们的事件,这些操作可以是下面操作中的一个;
CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据
三、自定义注册定时触发器
我们在需要在onElement中注册一个定时触发的任务
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
根据步长来注册下次执行的时间
然后
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
} else if(window.maxTimestamp() == time) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
在onProcessingTime的时候如果步长和当前的执行时间一致,则触发计算
并再注册下一次的触发时间,直到窗口结束。