正如在timestamp 和 watermark处理中所描述的一样,Flink 提供了抽象概念来允许程序员指定自己的timestamp和发射他们自己的watermark。更具体地说,用户可以根据自己的需要来实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks中的一个接口来指定自己的timestamp和发射自己的watermark。
为了更进一步的简化这些任务的编程工作,Flink还提供了一些预实现的timestamp分配器,这个章节提供了这些预实现timestamp分配器的列表。除了它们拆箱即用的功能外,它们的实现也可作为自定义实现的例子。
递增时间戳分配器
周期性水印生成的最简单的特殊例子是时间戳被给定的源任务按递增顺序产生,在这种情况下,当前的时间戳永远可以作为水印,因为没有更早的时间戳将到达。
注意:每个并行数据源任务中的timestamp是递增的,这是很必要的,例如:如果指定了一个Kafka分区被一个并行数据源实例读取,那么每个Kafka分区的timestamp是递增的,这是很有必要的。Flink的watermark合并机制将会在并行数据流shuffled、unioned、connected 或者 merged的时候产生正确的水印。
Java 代码:
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Scala 代码:
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允许固定数量延迟的分配器
周期性水印生成的另一个例子是当水印滞后的最大时间戳在数据流中被认为是一个固定的时间,在这种情况下,在数据流中遇到的最大延迟是已知的,例如,创建一个带时间戳的并在一个固定的时间内传播的元素的测试源。对于这些情况,Flink 提供了BoundedOutOfOrdernessTimestampExtractor,以maxOutOfOrderness作为参数,这个maxOutOfOrderness是指在窗口计算的最后,一个元素允许的最大延迟时间。延迟与t-t_w的结果相对应,这里t指的是元素的timestamp,而t_w指的是上个水印。如果延迟>0 那么这个元素被认为是延迟的,默认情况下,这个元素不计入窗口的最终计算中。请参考允许时延来获取更多关于延迟元素如何工作的信息。
Java代码:
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Scala代码:
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))