registerEventTimeTimer

之前写了一篇文章介绍registerProcessingTimeTimer,有兴趣可以看下之前的文章。这篇文章介绍一下registerEventTimeTimer。

背景

  • 首先介绍一下processingtime和eventtime的区别。
    processingtime 指的时间是当前时间
    eventtime 指的是数据里的时间。

  • registerProcessingTimeTimer与registerEventTimeTimer 区别
    上边文章讨论的是,注册相同的当前时间的timer,那么应该如何触发?
    本片则要讨论,如果注册相同事件时间(eventtime)的timer,那么在数据时间相同和数据时间不同时,如何触发?

测试栗子

public static void eventTimeWindow() throws Exception {
        long ct=System.currentTimeMillis();
        StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
        e.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        e.getConfig().setAutoWatermarkInterval(1000);
        DataStreamSource<Long> source = e
                .addSource(new SourceFunction<Long>() {
                    private volatile boolean stop = false;
                    @Override
                    public void run(SourceContext<Long> ctx) throws Exception {
                            for(int j=0;j<10;j++) {
                                ctx.collectWithTimestamp(
                                        (long) j,
                                        ct);
                                Thread.sleep(500);
                            }
                        for(int j=0;j<10;j++) {
                            ctx.collectWithTimestamp(
                                    (long) j,
                                    System.currentTimeMillis());
                            Thread.sleep(500);
                        }
                    }
                    @Override
                    public void cancel() {
                        stop = true;
                    }
                }).setParallelism(1);
        source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<Long>forBoundedOutOfOrderness(Duration.ofSeconds(1)))
                .keyBy(v->v/1000).process(new KeyedProcessFunction<Long, Long, Long>() {
            private ValueState<Integer> itemState;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ValueStateDescriptor<Integer> itemsStateDesc = new ValueStateDescriptor<>(
                        "itemState-state",
                        Integer.class);
                itemState = getRuntimeContext().getState(itemsStateDesc);
            }
            @Override
            public void processElement(Long value, Context ctx, Collector<Long> out) throws Exception {
                int val=(itemState.value()==null)?0:itemState.value();
                itemState.update(val+1);
             System.out.println(ctx.timerService().currentWatermark()+","+ctx.timestamp());
                ctx.timerService().registerEventTimeTimer(ct);
            }
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
                System.out.println(itemState.value()+"---"+timestamp+"——"+ctx.getCurrentKey());
            }
            @Override
            public void close() throws Exception {
                super.close();
            }
        }).setParallelism(1);
        e.execute();
    }

代码讲解:这个测试里的栗子是:

  • 数据源: 10条ct的数据,10条当前时间戳的数据。
  • registertime 是ct。

结果:

-9223372036854775808,1623310821756     //current watermark,nowct
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310821756
1623310821655,1623310829459
11---1623310821756——0                            //val,timer的时间戳
1623310829358,1623310829961
12---1623310821756——0
1623310829860,1623310830466
13---1623310821756——0
1623310830365,1623310830970
14---1623310821756——0
1623310830869,1623310831474
15---1623310821756——0
1623310831373,1623310831977
16---1623310821756——0
1623310831876,1623310832481
17---1623310821756——0
1623310832380,1623310832986
18---1623310821756——0
1623310832885,1623310833490
19---1623310821756——0
1623310833389,1623310833993
20---1623310821756——0

源码解析:

1. watermark生成方式

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

如上可知,当前watermark=maxTimestamp - outOfOrdernessMillis - 1,也就是ct-1000s-1,允许延迟1s。
由于使用的onPeriodicEmit ,watermark会定时1s更新一次。

2. 注册

@Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

只是添加到队列里。如果time相同就不会添加成功,那么也就不会触发Timer 。具体参考上一篇

3. 触发

public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

与processingtime不同,eventtime触发适合watermark有关的。当eventtimetimer队列不为空,且当前队列timer小于等于当前watermark就会触发。

结论:

综上:1. watermark是定时生成的,当前时间,间隔1000s生成一个,所以当有watermark且符合time<=watermark,才会触发timer。
2.因为相同timer会去重,所以当符合条件时,相同timer只会触发一次timer。
3.下次触发会在,watermark到来,且符合timer<=watermark时才会触发。

参考:watermark生成

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容