窗口实用触发器 ContinuousEventTimeTrigger

短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说短窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况,
对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。

ContinuousEventTimeTrigger

该类表示连续事件时间触发器,用在EventTime属性的任务流中,以事件时间的进度来推动定期触发。
<1> 其中的onElement方法:

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerEventTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }

        return TriggerResult.CONTINUE;
    }

对于每一条数据都会经过onElement处理,


image.png

这部分是用于判断是否触发窗口函数或者注册一个窗口endTime的定时触发器,endTime定时器最终触发窗口函数,就能够得到一个最终的窗口结果。
一旦流水位线达到了窗口的endTime,那么就会触发最终的函数。

image.png
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

这部分,ReducingState是context调用getPartitionedState方法,返回下一次的窗口函数触发时间
getPartitionedState:检索可用于与之交互的State对象容错状态,范围为当前触发器调用的窗口和键。
如果获取到保存下一次触发时间的状态为null,那么就会初始化,这里的初始化逻辑:
假设当前时间戳为110,调用函数的间隔时间即interval为25,那么
start=110-110%25=110-10=100
nextFireTimestamp=100+25=125
这就是距离当前时间戳最近的触发时间。

<2> 其中的onEventTime方法:

public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

       //窗口结束的触发
        if (time == window.maxTimestamp()){
            return TriggerResult.FIRE;
        }
        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.get();

        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.clear();
            fireTimestampState.add(time + interval);
            ctx.registerEventTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

在onEventTime 中会获取当前的触发时间fireTimestamp,然后注册下一个fireTimestamp+interval的触发器。可以看到反复的定时注册会导致其不断的循序下去,当窗口期结束肯定是需要结束该窗口的持续触发调用,那么是如何做到的呢?

在WindowOperator中onEventTime触发定时调用中会判断如果是窗口结束时间的触发调用会执行clearAllState方法,在该方法中会调用triggerContext.clear(),也就是会调用ContinuousEventTimeTrigger的clear方法,

public void clear(W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        Long timestamp = fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }
    }

那么其会删除下一次该窗口器的触发并且清除对应的ReducingState状态数据。

注意点:

<1> 连续定时触发与第一条数据有关,例如第一条数据是2019-11-16 11:22:01, 10s触发一次,那么后续触发时间就分别是2019-11-16 11:22:10、2019-11-16 11:22:20、2019-11-16 11:22:30
<2> 如果数据时间间隔相对于定期触发的interval比较大,那么有可能会存在多次输出相同结果的场景,比喻说触发的interval是10s, 第一条数据时间是2019-11-16 11:22:00, 那么下一次的触发时间是2019-11-16 11:22:10, 如果此时来了一条2019-11-16 11:23:00 的数据,会导致其watermark直接提升了1min, 会直接触发5次连续输出,对于下游处理来说可能会需要做额外的操作。
<3> 窗口的每一个key的触发时间可能会不一致,是因为窗口的每一个key对应的第一条数据时间不一样,正如上述所描述定时规则。由于会注册一个窗口endTime的触发器,会触发窗口所有key的窗口函数,保证最终结果的正确性。

场景:

比如说每个区域的每小时的商品销售额,要求是每隔1min能够看到销售额变动情况。
使用ContinuousProcessingTimeTrigger
一般的套路就是:
keyby()
.timeWindow(TumblingProcessingTimeWindows.of(Time.seconds(120))) 或者 window()
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))
...
这里以简单的WordCount为例,2min内每隔20s就统计下出现的word的次数。

public class ContinueTriggerDemo {

    public static void main(String[] args) throws Exception {

        String hostName="localhost";
        Integer port=Integer.parseInt("8801");

        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        //从指定socket获取输入数据
        DataStream<String> text=env.socketTextStream(hostName,port);

        text.flatMap(new LineSplitter())
                .keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
                .sum(1)
                .map(new TimestampAdd())
                .print();

        env.execute("start demo!");

    }



    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>>{

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens=s.toLowerCase().split("\\W+");

            for (String token:tokens){
                if (token.length()>0){
                    out.collect(new Tuple2<>(token,1));
                }
            }
        }
    }


    public static final class TimestampAdd implements MapFunction<Tuple2<String,Integer>, Tuple3<String,String,Integer>>{

        @Override
        public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value) throws Exception {

            DateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String s=format.format(new Date());

            return new Tuple3<>(value.f0,s,value.f1);
        }
    }
}

陆续输入
image.png

输出结果分析:


image.png

(1) 首先,时间窗口为滚动窗口2min,所以以3点开始为例,时间窗口为:
[15:00:00 - 15:02:00)
[15:02:00 - 15:04:00)
[15:04:00 - 15:06:00)
[15:06:00 - 15:08:00)
...
(2) 在上述给定窗口中,发现有个重要的前提就是窗口内要有数据,在有数据的情况下就会以20s的间隔调用函数,否则也不会有任何输出。
以“ff”为例,“ff”第一次出现时间为15:12:20,所以其所处的时间窗口是[15:12:00,15:14:00),所以会发现每隔20s调用1次,直到15:13:40为止,之后就是一个新的窗口[15:14:00,15:16:00)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容