之前写了一篇文章介绍registerProcessingTimeTimer,有兴趣可以看下之前的文章。这篇文章介绍一下registerEventTimeTimer。
背景
首先介绍一下processingtime和eventtime的区别。
processingtime 指的时间是当前时间
eventtime 指的是数据里的时间。registerProcessingTimeTimer与registerEventTimeTimer 区别
上边文章讨论的是,注册相同的当前时间的timer,那么应该如何触发?
本片则要讨论,如果注册相同事件时间(eventtime)的timer,那么在数据时间相同和数据时间不同时,如何触发?
测试栗子
public static void eventTimeWindow() throws Exception {
long ct=System.currentTimeMillis();
StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
e.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
e.getConfig().setAutoWatermarkInterval(1000);
DataStreamSource<Long> source = e
.addSource(new SourceFunction<Long>() {
private volatile boolean stop = false;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
for(int j=0;j<10;j++) {
ctx.collectWithTimestamp(
(long) j,
ct);
Thread.sleep(500);
}
for(int j=0;j<10;j++) {
ctx.collectWithTimestamp(
(long) j,
System.currentTimeMillis());
Thread.sleep(500);
}
}
@Override
public void cancel() {
stop = true;
}
}).setParallelism(1);
source.assignTimestampsAndWatermarks(WatermarkStrategy
.<Long>forBoundedOutOfOrderness(Duration.ofSeconds(1)))
.keyBy(v->v/1000).process(new KeyedProcessFunction<Long, Long, Long>() {
private ValueState<Integer> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Integer> itemsStateDesc = new ValueStateDescriptor<>(
"itemState-state",
Integer.class);
itemState = getRuntimeContext().getState(itemsStateDesc);
}
@Override
public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
int val=(itemState.value()==null)?0:itemState.value();
itemState.update(val+1);
System.out.println(ctx.timerService().currentWatermark()+","+ctx.timestamp());
ctx.timerService().registerEventTimeTimer(ct);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
super.onTimer(timestamp, ctx, out);
System.out.println(itemState.value()+"---"+timestamp+"——"+ctx.getCurrentKey());
}
@Override
public void close() throws Exception {
super.close();
}
}).setParallelism(1);
e.execute();
}
代码讲解:这个测试里的栗子是:
- 数据源: 10条ct的数据,10条当前时间戳的数据。
- registertime 是ct。
结果:
-9223372036854775808,1623310821756 //current watermark,nowct
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310829459
11---1623310821756——0 //val,timer的时间戳
1623310829358,1623310829961
12---1623310821756——0
1623310829860,1623310830466
13---1623310821756——0
1623310830365,1623310830970
14---1623310821756——0
1623310830869,1623310831474
15---1623310821756——0
1623310831373,1623310831977
16---1623310821756——0
1623310831876,1623310832481
17---1623310821756——0
1623310832380,1623310832986
18---1623310821756——0
1623310832885,1623310833490
19---1623310821756——0
1623310833389,1623310833993
20---1623310821756——0
源码解析:
1. watermark生成方式
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
如上可知,当前watermark=maxTimestamp - outOfOrdernessMillis - 1,也就是ct-1000s-1,允许延迟1s。
由于使用的onPeriodicEmit ,watermark会定时1s更新一次。
2. 注册
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
只是添加到队列里。如果time相同就不会添加成功,那么也就不会触发Timer 。具体参考上一篇
3. 触发
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
与processingtime不同,eventtime触发适合watermark有关的。当eventtimetimer队列不为空,且当前队列timer小于等于当前watermark就会触发。
结论:
综上:1. watermark是定时生成的,当前时间,间隔1000s生成一个,所以当有watermark且符合time<=watermark,才会触发timer。
2.因为相同timer会去重,所以当符合条件时,相同timer只会触发一次timer。
3.下次触发会在,watermark到来,且符合timer<=watermark时才会触发。
参考:watermark生成