1 eventTime
一个基于eventTime的flink程序必须定义:每条数据的eventTime时间戳和如何生成watermark。一旦设置了eventTime则必须设置watermark。
Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
(1)设定流的时间属性为eventtime
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
(2)给每条数据设置时间戳
要让flink知道,流入的哪个字段是eventtime。
需要重载public abstract long extractTimestamp(T var1);
2.watermark
flink会将窗口内的事件缓存下来,直到接收到一个watermark。
(1) 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此, window 的执行也是由 Watermark 触发的。
每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。
(2)watermark是单调递增的,保证时间不会倒流。
一个时间戳为t的watermark意味着,它之后到达的事件时间戳都大于t。
(3)周期性的watermark和逐个生成watermark
为什么会引入周期性watermark?
如果某个分区的watermark迟迟不更新,这回导致算子的eventtime停滞,导致大量的数据积压。因此引入了根据系统时间周期性生成watermark的方式。
(4)watermark的引入
dataStream.assignTimestampsAndWatermarks//会自动生成周期watermarks,存在默认间隔,也可以自己设置间隔setAutoWatermarkInterval(long interval)
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {//周期性生成watermark
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = -9223372036854775808L;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0L) {
throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
} else {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
}
}
public long getMaxOutOfOrdernessInMillis() {
return this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T var1);
public final Watermark getCurrentWatermark() {
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWM;
}
return new Watermark(this.lastEmittedWatermark);
}
//获取时间戳,最大时间戳是单调递增的
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = this.extractTimestamp(element);
if (timestamp > this.currentMaxTimestamp) {
this.currentMaxTimestamp = timestamp;
}
return timestamp;
}
}