在之前的Hello FlinkCEP文章中已经简单介绍了FlinkCEP的使用,只是为了简化逻辑并没有加入时间概念。那么在实际业务场景中,都是会要求在特定的时间内发生某种事件。在Flink中“时间”是一个非常重要的概念,可以参考官网对时间的介绍。本文只是基于CEP场景介绍迟到或者乱序的事件,是如何进行条件匹配的。本文基于flink 1.9版本。
In CEP the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.
业务场景
把Hello FlinkCEP中的业务场景加入一个时间限制,即只有在10秒中内连续发生两笔交易,并且第一笔交易额小于10,第二笔有效交易额大于100,就要触发告警。
业务实现
- 交易抽象为SubEvent.java,增加事件时间,其他部分请参见源码
public class SubEvent extends Event {
private String date;
public SubEvent(String id, EventType type, double volume, String date) {
super(id, type, volume);
this.date = date;
}
}
public class CEPWithTimeExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
properties.setProperty("group.id", "cepG");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer010<>("foo", new SimpleStringSchema(), properties));
DataStream<SubEvent> input = stream.map(new MapFunction<String, SubEvent>() {
@Override
public SubEvent map(String value) throws Exception {
String[] v = value.split(",");
return new SubEvent(v[0], EventType.valueOf(v[1]), Double.parseDouble(v[2]), v[3]);
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
Pattern<SubEvent, ?> pattern = Pattern.<SubEvent>begin("start").where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
System.out.println(subEvent + " from start at " + StringUtilsPlus.stampToDate(System.currentTimeMillis()));
return subEvent.getType() == EventType.VALID && subEvent.getVolume() < 10;
}
}
).next("end").where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
System.out.println(subEvent + " from end");
return subEvent.getType() == EventType.VALID && subEvent.getVolume() > 100;
}
}
).within(Time.seconds(10));
PatternStream<SubEvent> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<SubEvent, Alert>() {
@Override
public void processMatch(
Map<String, List<SubEvent>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
System.out.println(pattern);
out.collect(new Alert("111", "CRITICAL"));
}
});
result.print();
env.execute("Flink cep example");
}
private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<SubEvent> {
private final long maxOutOfOrderness = 5000;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(SubEvent subEvent, long previousElementTimestamp) {
System.out.println("SubEvent is " + subEvent);
long timestamp = StringUtilsPlus.dateToStamp(subEvent.getDate());
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
}
- 样例说明
使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
声明使用事件时间,并通过BoundedOutOfOrdernessGenerator
指定允许事件迟到5秒,关于watermark的介绍可以参见官网。
源码流程分析
- 首先,如果是使用EventTime,会进入flink源码
CepOperator.java
中processElement
方法,当事件时间大于上一次的Watermark时,会把当前的event加入到elementQueueState
队列中,不符合条件的默认会直接丢弃,关键代码如下:
long timestamp = element.getTimestamp();
IN value = element.getValue();
// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.
if (timestamp > lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
saveRegisterWatermarkTimer();
bufferEvent(value, timestamp);//event加入缓存队列
} else if (lateDataOutputTag != null) {
output.collect(lateDataOutputTag, element);
}
- 满足条件的event加入队列以后,会在
CepOperator.java
中onEventTime
方法中判断是否执行触发计算,这个方法非常的重要,里面大概分为了5个步骤,源码如下:
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in event time order and custom comparator if exists
// by feeding them in the NFA
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// 5) update the last seen watermark.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfaState = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfaState, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
event -> {
try {
processEvent(nfaState, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
elementQueueState.remove(timestamp);
}
// STEP 3
advanceTime(nfaState, timerService.currentWatermark());
// STEP 4
updateNFA(nfaState);
if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
saveRegisterWatermarkTimer();
}
// STEP 5
updateLastSeenWatermark(timerService.currentWatermark());
}
STEP 1:队列中的事件可能时序是乱序的,所以需要先根据event time进行排序;
STEP 2:从事件时间戳最小的开始遍历,只有时间戳小于等于当前watermark时才会计算;
其他剩下的步骤就是更新相关的状态和Watermark
测试数据
id | type | volume | timestamp |
---|---|---|---|
1 | VALID | 1 | 2019-10-18 01:00:30 |
2 | VALID | 1 | 2019-10-18 01:00:24 |
3 | VALID | 1 | 2019-10-18 01:00:28 |
4 | VALID | 200 | 2019-10-18 01:00:35 |
5 | VALID | 1 | 2019-10-18 01:00:45 |
结果分析
- id为1的event初始化会进入
elementQueueState
队列中,此时Watermark=2019-10-18 01:00:25
; - 由于id为2的event,时间戳小于上次的Watermark,即
2019-10-18 01:00:24 < 2019-10-18 01:00:25
,所以这个事件不会进入elementQueueState
队列,此时Watermark=2019-10-18 01:00:25
; - id为3的event时间戳大于上一次的Watermark所以正常进入
elementQueueState
队列,此时Watermark=2019-10-18 01:00:25
,队列中时序是乱的,在onEventTime
方法先排序,排序之后元素顺序为event3,event1
,这两个事件的时间戳都大于当前的Watermark=2019-10-18 01:00:25
,所以这时不会触发计算; - id为4的event符合条件,进入
elementQueueState
队列,并更新Watermark为2019-10-18 01:00:30
,队列排序后结果为event3,event1,event4
,但是event4的时间戳大于当前的Watermark,所以只有event3和event1触发计算,这时event1满足start模式的条件; - 同上,id为5的event也会触发计算,并更新Watermark为
2019-10-18 01:00:40
,这时队列中为event4,event5
,只有event4满足时间戳小于当前的Watermark触发计算,并且event4也满足end模式的条件,所以最终触发了一次告警,即event1-->event4
。
其他
以上的测试数据主要是说明何时会触发计算,触发计算之后才会判断两个事件是否在指定的时间内发生。可以把本例中的maxOutOfOrderness
改为20000,再使用下面的数据进行测试即可,第一组数据最终会触发告警,第二组数据不会。
第一组
1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:39
3,VALID,300,2019-10-18 03:00:00
第二组
1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:41
3,VALID,300,2019-10-18 03:00:00
总结
本文基于Hello FlinkCEP文章进一步通过一个样例和一些源码的说明,演示了带有EventTime的事件是如何触发计算以及模式匹配的。