Flink提供了接口,允许编程者自定义 timesatmp extractor 与 watermark emitter。更确切的说,可以根据情况,实现 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks接口。简单来说,AssignerWithPeriodicWatermarks 会周期性的发射watermark,AssignerWithPunctuatedWatermarks 会根据数据的某些属性来决定是否发射watermark。
为了更大程度的简化这个编程过程,flink自带了一些默认实现的timestamp assigner。这一部分会介绍它们。除了它们开箱即用的特性,它们的实现也是一个实现自定义类的很好的参考。
Assigners with ascending timestamps
Periodic Watermarks最简单的场景便是source中数据的timestamp是升序流入的。这种情况下,当前流入数据的timestamp就可以当做是watermark,因为不会有乱序的情况发生,也就是不会有小于当前数据时间戳的数据在当前数据之后流入。
注意,仅仅需要在每个并发的source task中保持上述特性就可以。例如:通过设置后,每个Kafka partiton都在一个独立的source task中读取数据,这时,只要求每个partition的数据的时间戳是升序的就可以。Flink的watermark的merge机制会在并发流的shuffled,unionconnected,merged时生成正确的watermark。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Assingers allowing a fixed amount of lateness
另一个使用 Periodic Watermarks 的场景是watermark始终落后当前流中最大时间戳一个固定的时间。也适用于事前就知道流中数据的最大迟到时间的情况,如:在测试时,使用自定义source读取在一定时间范围内乱序的数据。(注:最大迟到时间不是指数据的时间戳与系统时间的差值,而是迟到数据与该数据之前流入的数据的最大时间戳的差值)
对于这种情况,flink提供了 BoundedOutOfOrdernessTimestampExtractor ,它的入参为 maxOutOfOrderness,也就是,对于给定的window来说,它需要在event time坐标系下等待多久的迟到数据之后再触发计算(超过等待时间的数据会被忽略)。数据是否迟到取决于 lateness= t - t_w 的值,其中t代表event time坐标系下数据的时间戳,t_w代表该数据之前的最大watermark。如果 lateness > 0 ,那么该数据会被认为是迟到了,默认情况下,当进行window计算时,该数据会被忽略,不参与计算。(注:根据这个公式,我觉得是 lateness < 0 的情况下,才是迟到数据)
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});