短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说短窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况,
对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。
ContinuousEventTimeTrigger
该类表示连续事件时间触发器,用在EventTime属性的任务流中,以事件时间的进度来推动定期触发。
<1> 其中的onElement方法:
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
}
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
}
return TriggerResult.CONTINUE;
}
对于每一条数据都会经过onElement处理,
这部分是用于判断是否触发窗口函数或者注册一个窗口endTime的定时触发器,endTime定时器最终触发窗口函数,就能够得到一个最终的窗口结果。
一旦流水位线达到了窗口的endTime,那么就会触发最终的函数。
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
这部分,ReducingState是context调用getPartitionedState方法,返回下一次的窗口函数触发时间
getPartitionedState:检索可用于与之交互的State对象容错状态,范围为当前触发器调用的窗口和键。
如果获取到保存下一次触发时间的状态为null,那么就会初始化,这里的初始化逻辑:
假设当前时间戳为110,调用函数的间隔时间即interval为25,那么
start=110-110%25=110-10=100
nextFireTimestamp=100+25=125
这就是距离当前时间戳最近的触发时间。
<2> 其中的onEventTime方法:
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
//窗口结束的触发
if (time == window.maxTimestamp()){
return TriggerResult.FIRE;
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
Long fireTimestamp = fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
fireTimestampState.add(time + interval);
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
在onEventTime 中会获取当前的触发时间fireTimestamp,然后注册下一个fireTimestamp+interval的触发器。可以看到反复的定时注册会导致其不断的循序下去,当窗口期结束肯定是需要结束该窗口的持续触发调用,那么是如何做到的呢?
在WindowOperator中onEventTime触发定时调用中会判断如果是窗口结束时间的触发调用会执行clearAllState方法,在该方法中会调用triggerContext.clear(),也就是会调用ContinuousEventTimeTrigger的clear方法,
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
Long timestamp = fireTimestamp.get();
if (timestamp != null) {
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
}
那么其会删除下一次该窗口器的触发并且清除对应的ReducingState状态数据。
注意点:
<1> 连续定时触发与第一条数据有关,例如第一条数据是2019-11-16 11:22:01, 10s触发一次,那么后续触发时间就分别是2019-11-16 11:22:10、2019-11-16 11:22:20、2019-11-16 11:22:30
<2> 如果数据时间间隔相对于定期触发的interval比较大,那么有可能会存在多次输出相同结果的场景,比喻说触发的interval是10s, 第一条数据时间是2019-11-16 11:22:00, 那么下一次的触发时间是2019-11-16 11:22:10, 如果此时来了一条2019-11-16 11:23:00 的数据,会导致其watermark直接提升了1min, 会直接触发5次连续输出,对于下游处理来说可能会需要做额外的操作。
<3> 窗口的每一个key的触发时间可能会不一致,是因为窗口的每一个key对应的第一条数据时间不一样,正如上述所描述定时规则。由于会注册一个窗口endTime的触发器,会触发窗口所有key的窗口函数,保证最终结果的正确性。
场景:
比如说每个区域的每小时的商品销售额,要求是每隔1min能够看到销售额变动情况。
使用ContinuousProcessingTimeTrigger
一般的套路就是:
keyby()
.timeWindow(TumblingProcessingTimeWindows.of(Time.seconds(120))) 或者 window()
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))
...
这里以简单的WordCount为例,2min内每隔20s就统计下出现的word的次数。
public class ContinueTriggerDemo {
public static void main(String[] args) throws Exception {
String hostName="localhost";
Integer port=Integer.parseInt("8801");
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//从指定socket获取输入数据
DataStream<String> text=env.socketTextStream(hostName,port);
text.flatMap(new LineSplitter())
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(120)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.sum(1)
.map(new TimestampAdd())
.print();
env.execute("start demo!");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>>{
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens=s.toLowerCase().split("\\W+");
for (String token:tokens){
if (token.length()>0){
out.collect(new Tuple2<>(token,1));
}
}
}
}
public static final class TimestampAdd implements MapFunction<Tuple2<String,Integer>, Tuple3<String,String,Integer>>{
@Override
public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value) throws Exception {
DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String s=format.format(new Date());
return new Tuple3<>(value.f0,s,value.f1);
}
}
}
陆续输入 输出结果分析:
(1) 首先,时间窗口为滚动窗口2min,所以以3点开始为例,时间窗口为:
[15:00:00 - 15:02:00)
[15:02:00 - 15:04:00)
[15:04:00 - 15:06:00)
[15:06:00 - 15:08:00)
...
(2) 在上述给定窗口中,发现有个重要的前提就是窗口内要有数据,在有数据的情况下就会以20s的间隔调用函数,否则也不会有任何输出。
以“ff”为例,“ff”第一次出现时间为15:12:20,所以其所处的时间窗口是[15:12:00,15:14:00),所以会发现每隔20s调用1次,直到15:13:40为止,之后就是一个新的窗口[15:14:00,15:16:00)