Flink-ContinuousProcessingTimeTrigger源码解析及一个小问题处理

背景

工作中遇到一个需求,需要按天划分窗口,并且每隔固定时间段触发一次窗口计算,时间语义为ProcessingTime。在测试过程中发现,使用ContinuousProcessingTimeTrigger会有一个问题:当窗口到达EndTime时并不会触发。

测试

在本地测试时使用自造数据:类别,数量,时间。然后统计每分钟的总量,每10秒钟触发一次窗口计算,并且触发窗口计算后立即清除已经计算过的所有数据,累计的总量值通过状态保存。

public class demo2 {
    private static class DataSource extends RichParallelSourceFunction<Tuple3<String,Integer,String>>{
        private volatile boolean isRunning=true;
        @Override
        public void run(SourceContext<Tuple3<String,Integer,String>> ctx) throws Exception{
            Random random=new Random();
            while(isRunning){
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask()+1)*1000*8);
                String key="类别"+(char)('A'+random.nextInt(1));
                int value=random.nextInt(10)+1;
                String dt=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis());
                System.out.println(String.format("Emits\t(%s,%d,%s)",key,value,dt));
                ctx.collect(new Tuple3<>(key,value,dt));
            }
        }
        @Override
        public void cancel(){
            isRunning=false;
        }
    }
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<Tuple3<String,Integer,String>> ds =env.addSource(new DataSource());
        SingleOutputStreamOperator<String> res=ds
                .keyBy(
                (KeySelector<Tuple3<String, Integer,String>, String>) in -> in.f0
        )
                .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                .evictor(CountEvictor.of(0,true))
                .process(new ProcessWindowFunction<Tuple3<String, Integer,String>, String, String, TimeWindow>() {
                    private static final long serialVersionUID = 3091075666113786631L;
                    private ValueState<Integer> valueState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Integer> desc=new ValueStateDescriptor<>("value_state",Integer.class);
                        valueState=getRuntimeContext().getState(desc);
                        super.open(parameters);
                    }
                    @Override
                    public void process(String tuple, Context context, Iterable<Tuple3<String, Integer,String>> iterable, Collector<String> collector) throws Exception {
                        //测试输出:窗口的每次触发时间
                        System.out.println("trigger:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",context.currentProcessingTime()));
                        int res=0;
                        if(valueState.value()!=null){
                            res=valueState.value();
                        }
                        for(Tuple3<String, Integer,String> val:iterable){
                            res+=val.f1;
                        }
                        valueState.update(res);
                        String out=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss",context.window().getStart())+
                                ","+tuple.toString()+":"+valueState.value();
                        collector.collect(out);
                    }
                    @Override
                    public void clear(Context context) throws Exception {
                        //状态清理时间
                        System.out.println("Start Clear:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis()));
                        valueState.clear();
                        super.clear(context);
                    }
                });
        res.process(new ProcessFunction<String, Object>() {
            @Override
            public void processElement(String s, Context context, Collector<Object> collector) throws Exception {
                System.out.println(s);
            }
        });
        env.execute();
    }
}

程序执行后的输出结果如下:


image.png

从上图可以看到在30/40/50这三个节点,窗口都触发了计算,并输出了正确的累计结果,但是在窗口结束的时间点并未触发计算

问题定位

看源码

  • 属性声明
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long interval;

    /** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

interval为传入的触发时间间隔;stateDesc是定义的ReduceState状态描述符,Min()代表选择的ReduceFunction,表示选择多个时间戳中时间最小的。

  • onElement方法
    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

onElement方法是用来初始化窗口的第一次的触发时间。

  • onProcessingTime方法
    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

onProcessingTime方法是基于ProcessingTime的回调方法,首先从状态中获取当前的触发时间,然后跟定时器中时间进行比对,如果两者相等,则清除状态值并重新初始化,然后更新注册下一次的定时器触发时间,最后触发窗口计算。
由onProcessingTime的代码推测,最后一次fireTimestamp和ctx.registerProcessingTimeTimer注册的时间已经超出了窗口的结束时间,导致在窗口结束时并不会触发最后一次计算。

  • 测试代码验证
    根据ContinuousProcessingTimeTrigger的源码新建一个MyContinuousProcessingTimeTrigger的类,修改其中的onProcessingTime方法:
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            System.out.println("nextFireTime:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",time+this.interval));
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

然后再测试代码中使用MyContinuousProcessingTimeTrigger,测试输出如下:


image.png

前两次注册的40&50秒两个时间点都会正确触发,但17:00:00这个时间点因为此时窗口以及关闭(窗口的关闭时间:16:59:59.999),导致不会触发。
问题的源头以及确认,那接下来就是解决这个问题了。

解决途径

解决这个问题,同样需要去翻源码,我们在窗口的process方法中找到如下代码:

        if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
        }
    private void clearAllState(
            W window,
            ListState<StreamRecord<IN>> windowState,
            MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        triggerContext.clear();
        processContext.window = window;
        processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

可以看到,会有一个CleanupTime,当满足这个条件时,会清除窗口的信息。继续翻isCleanupTime这个方法:

    /**
     * Returns {@code true} if the given time is the cleanup time for the given window.
     */
    protected final boolean isCleanupTime(W window, long time) {
        return time == cleanupTime(window);
    }
    /**
     * Returns the cleanup time for a window, which is
     * {@code window.maxTimestamp + allowedLateness}. In
     * case this leads to a value greater than {@link Long#MAX_VALUE}
     * then a cleanup time of {@link Long#MAX_VALUE} is
     * returned.
     *
     * @param window the window whose cleanup time we are computing.
     */
    private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

可以看到对于非EventTime的语义,cleanupTime就是窗口的结束时间window.maxTimestamp(),看到这里,解决问题的方法也就有了:
修改MyContinuousProcessingTimeTrigger中的onProcessingTime方法:

    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        if(time==window.maxTimestamp()){
            return TriggerResult.FIRE;
        }
        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

测试结果:


image.png

可以看到在窗口结束时会触发正确的统计结果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容