FlinkCEP with EventTime

在之前的Hello FlinkCEP文章中已经简单介绍了FlinkCEP的使用,只是为了简化逻辑并没有加入时间概念。那么在实际业务场景中,都是会要求在特定的时间内发生某种事件。在Flink中“时间”是一个非常重要的概念,可以参考官网对时间的介绍。本文只是基于CEP场景介绍迟到或者乱序的事件,是如何进行条件匹配的。本文基于flink 1.9版本。

In CEP the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.

业务场景

Hello FlinkCEP中的业务场景加入一个时间限制,即只有在10秒中内连续发生两笔交易,并且第一笔交易额小于10,第二笔有效交易额大于100,就要触发告警。

业务实现

  1. 交易抽象为SubEvent.java,增加事件时间,其他部分请参见源码
public class SubEvent extends Event {
    private String date;

    public SubEvent(String id, EventType type, double volume, String date) {
        super(id, type, volume);
        this.date = date;
    }
}
  1. 样例CEPWithTimeExample.java
public class CEPWithTimeExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
        properties.setProperty("group.id", "cepG");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer010<>("foo", new SimpleStringSchema(), properties));


        DataStream<SubEvent> input = stream.map(new MapFunction<String, SubEvent>() {
            @Override
            public SubEvent map(String value) throws Exception {
                String[] v = value.split(",");
                return new SubEvent(v[0], EventType.valueOf(v[1]), Double.parseDouble(v[2]), v[3]);
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());

        Pattern<SubEvent, ?> pattern = Pattern.<SubEvent>begin("start").where(
                new SimpleCondition<SubEvent>() {
                    @Override
                    public boolean filter(SubEvent subEvent) {
                        System.out.println(subEvent + " from start at " + StringUtilsPlus.stampToDate(System.currentTimeMillis()));
                        return subEvent.getType() == EventType.VALID && subEvent.getVolume() < 10;
                    }
                }
        ).next("end").where(
                new SimpleCondition<SubEvent>() {
                    @Override
                    public boolean filter(SubEvent subEvent) {
                        System.out.println(subEvent + " from end");
                        return subEvent.getType() == EventType.VALID && subEvent.getVolume() > 100;
                    }
                }
        ).within(Time.seconds(10));

        PatternStream<SubEvent> patternStream = CEP.pattern(input, pattern);

        DataStream<Alert> result = patternStream.process(
                new PatternProcessFunction<SubEvent, Alert>() {
                    @Override
                    public void processMatch(
                            Map<String, List<SubEvent>> pattern,
                            Context ctx,
                            Collector<Alert> out) throws Exception {

                        System.out.println(pattern);

                        out.collect(new Alert("111", "CRITICAL"));
                    }
                });

        result.print();

        env.execute("Flink cep example");
    }

    private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<SubEvent> {

        private final long maxOutOfOrderness = 5000;

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(SubEvent subEvent, long previousElementTimestamp) {
            System.out.println("SubEvent is " + subEvent);
            long timestamp = StringUtilsPlus.dateToStamp(subEvent.getDate());
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
            return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}
  1. 样例说明
    使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);声明使用事件时间,并通过BoundedOutOfOrdernessGenerator指定允许事件迟到5秒,关于watermark的介绍可以参见官网

源码流程分析

  1. 首先,如果是使用EventTime,会进入flink源码CepOperator.javaprocessElement方法,当事件时间大于上一次的Watermark时,会把当前的event加入到elementQueueState队列中,不符合条件的默认会直接丢弃,关键代码如下:
long timestamp = element.getTimestamp();
IN value = element.getValue();

// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.

if (timestamp > lastWatermark) {

    // we have an event with a valid timestamp, so
    // we buffer it until we receive the proper watermark.

    saveRegisterWatermarkTimer();

    bufferEvent(value, timestamp);//event加入缓存队列

} else if (lateDataOutputTag != null) {
    output.collect(lateDataOutputTag, element);
}
  1. 满足条件的event加入队列以后,会在CepOperator.javaonEventTime方法中判断是否执行触发计算,这个方法非常的重要,里面大概分为了5个步骤,源码如下:
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

    // 1) get the queue of pending elements for the key and the corresponding NFA,
    // 2) process the pending elements in event time order and custom comparator if exists
    //      by feeding them in the NFA
    // 3) advance the time to the current watermark, so that expired patterns are discarded.
    // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
    //      have state to be used later.
    // 5) update the last seen watermark.

    // STEP 1
    PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    NFAState nfaState = getNFAState();

    // STEP 2
    while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
        long timestamp = sortedTimestamps.poll();
        advanceTime(nfaState, timestamp);
        try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
            elements.forEachOrdered(
                event -> {
                    try {
                        processEvent(nfaState, event, timestamp);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            );
        }
        elementQueueState.remove(timestamp);
    }

    // STEP 3
    advanceTime(nfaState, timerService.currentWatermark());
    // STEP 4
    updateNFA(nfaState);

    if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
        saveRegisterWatermarkTimer();
    }

    // STEP 5
    updateLastSeenWatermark(timerService.currentWatermark());
}

STEP 1:队列中的事件可能时序是乱序的,所以需要先根据event time进行排序;

STEP 2:从事件时间戳最小的开始遍历,只有时间戳小于等于当前watermark时才会计算;

其他剩下的步骤就是更新相关的状态和Watermark

测试数据

id type volume timestamp
1 VALID 1 2019-10-18 01:00:30
2 VALID 1 2019-10-18 01:00:24
3 VALID 1 2019-10-18 01:00:28
4 VALID 200 2019-10-18 01:00:35
5 VALID 1 2019-10-18 01:00:45

结果分析

  1. id为1的event初始化会进入elementQueueState队列中,此时Watermark=2019-10-18 01:00:25
  2. 由于id为2的event,时间戳小于上次的Watermark,即2019-10-18 01:00:24 < 2019-10-18 01:00:25,所以这个事件不会进入elementQueueState队列,此时Watermark=2019-10-18 01:00:25
  3. id为3的event时间戳大于上一次的Watermark所以正常进入elementQueueState队列,此时Watermark=2019-10-18 01:00:25,队列中时序是乱的,在onEventTime方法先排序,排序之后元素顺序为event3,event1,这两个事件的时间戳都大于当前的Watermark=2019-10-18 01:00:25,所以这时不会触发计算;
  4. id为4的event符合条件,进入elementQueueState队列,并更新Watermark为2019-10-18 01:00:30,队列排序后结果为event3,event1,event4,但是event4的时间戳大于当前的Watermark,所以只有event3和event1触发计算,这时event1满足start模式的条件;
  5. 同上,id为5的event也会触发计算,并更新Watermark为2019-10-18 01:00:40,这时队列中为event4,event5,只有event4满足时间戳小于当前的Watermark触发计算,并且event4也满足end模式的条件,所以最终触发了一次告警,即event1-->event4

其他

以上的测试数据主要是说明何时会触发计算,触发计算之后才会判断两个事件是否在指定的时间内发生。可以把本例中的maxOutOfOrderness改为20000,再使用下面的数据进行测试即可,第一组数据最终会触发告警,第二组数据不会。
第一组

1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:39
3,VALID,300,2019-10-18 03:00:00

第二组

1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:41
3,VALID,300,2019-10-18 03:00:00

总结

本文基于Hello FlinkCEP文章进一步通过一个样例和一些源码的说明,演示了带有EventTime的事件是如何触发计算以及模式匹配的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352