时间概念
在做实时计算的时候, 首先就需要搞清楚一个问题, 这个实时到底是怎么样的一个时间概念. 在 Flink 中, 总共有3种时间概念, 分别是 事件时间 ( Event time ) / 处理时间 ( Processing time ) / 接入时间 ( Ingestion time).
事件时间 ( Event time )就是真实的用户发生操作的时候所产生的时间, 对应到 flink 中, 需要用户 显示 的告诉 flink 到底每个输入中的哪一个字段代表这个事件时间。
接入时间 ( Ingestion time) 和处理时间 ( Processing time )是不需要用户去指定的, flink自己会去处理这个时间. 接入时间的代表的是一个事件通过 source Operator 的时间, 相比于 event time, ingestion time 不能处理乱序事件, 因此也就不用生成对应的watermark. 处理时间是指事件在操作算子计算过程中获取到的所在主机的时间. processing time 适合用于时间计算精度要求不是特别高的计算场景, 例如统计某些延时非常高的日志数据.
水位线机制 watermark
1, 解释 watermark
watermark 这个概念在 flink 中是与 event time 这个时间概念相互依存的, 其目的是为了解决数据乱序到达和系统延迟的问题. flink会把读取进系统的最新事件时间减去固定的时间间隔作为 watermark. 还是用一张图来解释watermark 的作用.
当事件进入 flink 中的时候, 根据提取的 event time 产生 watermark 时间戳, 记为 X, 进入 flink 中的 event time 记为 Y. 当窗口的 end time < X 的时候, 则触发窗口计算结果并输出. 只要 X < end time, 那么 事件就可以 一直进入到当前窗口中, 这样的话即便发生乱序, 也可以在窗口中调整. 调整的方法就是按照 Y.
动图见Blog gif
2, 使用 watermark
a. 在 Source Function 中 直接指定 Timestamps 和 Watermark
用户需要复写 SourceFunction 接口中 run( ) 方法实现数据逻辑, 同时调用 SourceContext 的 collectWithTimestamp( ) 方法生成 event time 时间戳, 调用 emitWatermark( ) 方法生成 watermark.
DataStream<String> text = env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (String s : elementInput) {
// 切割每一条数据
String[] inp = s.split(",");
Long timestamp = new Long(inp[1]);
// 生成 event time 时间戳
ctx.collectWithTimestamp(s, timestamp);
// 调用 emitWatermark() 方法生成 watermark, 最大延迟设定为 2
ctx.emitWatermark(new Watermark(timestamp - 2));
}
// 设定默认 watermark
ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
@Override
public void cancel() {
}
});
b. 通过 Flink 自带的 Timestamp Assigner 指定 Timestamp 和 生成 watermark
在使用了 flink 定义的外部数据源( 如 kafka) 之后, 就不能通过自定义 sourcefunction 的方式来生成 watermark 和 event time 了, 这个时候可以使用 Timestamp Assigner, 其需要在第一个时间相关的 Operator前使用. Flink 有自己定义好的 Timestamp Assigner 可以直接使用 (包括直接指定的方式和固定时间延迟的方式 ).Flink 将 watermark 分为 Periodic Watermarks (根据设定的时间间隔周期性的生成) 和 Punctuated Watermarks (根据接入数量生成), 用户也可以继承对应的类实现这两种 watermark.
b.1 使用 Ascending Timestamp Assigner 指定 Timestamps 和 Watermark
// 首先需要指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 使用 Ascending 分配 时间信息和 watermark
DataStream<Tuple2<String, Long>> text = env.fromCollection(collectionInput);
text.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
b.2 使用固定时延间隔的 Timestamp Assigner 指定
// 使用 Ascending 分配 时间信息和 watermark 设定10s 代表最长的时延
DataStream<Tuple2<String, Long>> text = env.fromCollection(collectionInput);
text.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String, Long>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple2<String, Long> element) {
return element.f1;
}
});
c. 自定义 Timestamp Assigner 和 Watermark Generator
用户可以自定义实现 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 两个接口来分别生成对应的两种 watermark. 这一块用的比较少, 以后有机会再细写.