本节与在事件时间上运行的程序相关。有关事件时间、处理时间和摄入时间的介绍,请参见事件时间简介。
为了处理事件时间,流程序需要设置相应的时间特性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
分配时间戳
为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配它的事件时间戳。这通常是通过访问/提取元素中的某个字段的时间戳来完成的。
时间戳的分配与生成的水印是同步的,它告诉系统在事件时间内的进展。
有两种分配时间戳和生成水印的方法:
- 直接在数据流源中。
- 通过timestamp assigner / watermark generator:在Flink时间戳生成器中,也定义了要发出的水印。
注意:时间戳和水印都被指定为自1970-01- 01T00:00 - 00Z的Java时代以来的毫秒数。
具有时间戳和水印的原函数
流Source可以为它们生成的元素直接分配时间戳,并且发送水印。当这样做时,不需要时间戳生成器。注意,如果使用了时间戳生成器,由Source生成的时间戳和水印都会被覆盖。
要直接为源中的元素分配时间戳,Source必须使用SourceContext的collectWithTimestamp(...) 方法。为了生成水印,Source必须使用emitWatermark(Watermark)方法。
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
Timestamp Assigners / Watermark Generators
时间戳分配器作用在流上,并且生成一条具有时间戳的元素和水印的新流。如果原始流已经有时间戳和/或水印,时间戳分配器会覆盖它们。
时间戳分配器通常在数据源之后立即指定,但它并不是严格要求这样做的。例如,通用的模式是在解析(MapFunction) 和过滤(FilterFunction) 在时间戳分配器之前。在任何情况下,时间戳分配器都需要在基于事件时间的第一个操作(例如第一个窗口操作)之前指定。作为一种特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在Source(或消费者)内部指定一个时间戳分配器/水印发射器。关于如何这样做的更多信息可以在Kafka连接器文档中找到。
注意:本节的其余部分介绍了一个程序员创建她自己的时间戳分配器/水印发射器必须实现的主要接口。若要查看使用Flink预定义的提取器,请参阅Pre-defined Timestamp Extractors / Watermark Emitters页面。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
周期性水印
AssignerWithPeriodicWatermarks分配时间戳并且周期性的生成水印(可能取决于流元素,或者纯粹基于处理时间)。
通过ExecutionConfig.setAutoWatermarkInterval(…)定义水印生成的间隔(每n毫秒)。每次都会调用assigner 的getCurrentWatermark()方法,如果水印不为空,且大于之前的水印,则会发出一个新的水印。
下面是两个具有周期性水印的时间戳分配器的简单示例。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
With Punctuated Watermarks
当使用一个特定的事件指出一个新的水印需要生成,使用AssignerWithPunctuatedWatermarks。在这种情况下,Flink首先会调用extractTimestamp(…)方法来分配元素一个时间戳,然后立即在该元素上执行checkAndGetNextWatermark(…)方法。
checkAndGetNextWatermark(…)方法通过在extractTimestamp(…)方法中分配的时间戳,并可以决定是否要生成一个水印。每当checkAndGetNextWatermark(…)方法返回非空的水印,并且水印大于最新的水印时,就会发出新的水印。
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
注意:在每一个事件上都有可能产生水印。然而,由于每个水印都导致了一些下游的计算,过多的水印会降低性能。
Timestamps per Kafka Partition
当使用Apache Kafka作为数据源时,每个Kafka分区可能都有一个简单的事件时间模式(递增的时间戳或无序有界)。然而,当从Kafka消费流时,多个分区经常被并行地消费,会交错的从分区获取事件并且破坏预定义的模式(这是Kafka的消费者客户端工作的固有特性)。
在这种情况下,你可以使用Flink的Kafka-partition-aware水印生成。使用这个特性,水印在Kafka的消费者每个Kafka分区内部生成,并且每个分区的水印以与在流上合并水印相同的方式合并。
例如,如果时间时间戳严格按照Kafka分区递增,使用递增的时间戳水印生成器生成每个分区的水印会生成完美的整体水印。
下图展示了如何使用每个Kafka分区水印生成,以及这种情况下水印如何在流数据流中传递。
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);