时间语义
- Event Time:事件创建的时间
- Ingestion Time: 数据进入Flink的时间
- Processing Time: 执行操作算子的本地系统时间,与机器相关
不同的时间语义有不同的应用场合
我们往往更关心事件时间(Event Time)
Event TIme可以从日志数据的时间戳(timestamp)中提取
如果在日志中没有时间戳或者日志中的时间戳不准,这时可以使用处理时间(Process Time)
代码中默认使用处理时间(Process Time),需要在代码中设置Event Time
对运行环境调用setStreamTimeCharacteristic方法,设置流的时间特性,具体时间,还需要从数据中提取时间戳(timestamp)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
当使用Event Time模式处理数据时,乱序数据的影响
- 由于网络,分布式等原因,会导致乱序数据的产生
- 当Flink以Event time模式处理数据流式,它会根据数据里的时间戳来处理基于时间的算子
- 为避免乱序数据对Event Time模式处理数据的影响,引入Watermark(水位线)概念
Watermark(水位线)
- 避免乱序数据:遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
- Watermark是一种衡量Event Time进展的机制,可以设定延迟触发
- Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
- 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,<u>因此,window的关闭执行是由Watermark触发的;不过触发的窗口的数据范围还是windows自己定义的范围</u>
- watermark用来让程序自己平衡延迟和结果正确性
watermark的特点:
- watermark是一条带着时间戳的特殊数据记录
- watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
- watermark与数据的时间戳相关
watermark的传递
- 当重分区时,是把上游的watermark广播到下游所有分区中;因为上游可能是多个任务并行在跑,下游接收到的watermark会有多个,这个时候上游广播给下游的最小的watermark;(木桶原理)
watermark原理:表示在watermark之前时间数据都接收到了
watermark的设定
在Flink中,watermark由开发人员生成,通常需要开发人员对正在处理的数据和业务有一定的了解,可以知道设置watermark的延迟设置多少,能保证实时数据尽量正确;
watermark的延迟设定,需要先看数据情况,分析得出最佳延迟时长
如果还有数据丢失,可以使用旁路输出sideOutputLateData,这样当窗口关闭后,数据可以从侧输出流输出
watermark的生成
watermark生成(Generator)分为周期性生成(Period Generator)后标记生成(Punctuated Generator);
周期性生成的生成周期是由ExecutionConfig.getAutoWatermarkInterval()决定默认是200ms
标记生成是来一条数据,生成一个watermark
对于数据量过大,同一个时间戳中来大量数据,这时候也会生成很多重复的watermark,会多消耗资源,降低数据处理速度,这个时候建议使用周期性生成watermark;
在数据稀疏时和时效性强时,可以使用标记生成watermark
- 在大数据中常用选择是周期性生成watermark
Windows窗口起始点和偏移量
event time窗口起始点是根据TumblingEventTimeWindows中代码生成的,如下代码语句,是使用传入的时间戳和窗口偏移量计算得出
long start = TimeWindow.getWindowStartWithOffset(timestamp, (this.globalOffset +this.staggerOffset) % this.size, this.size);
具体计算公式如下:
public static long getWindowStartWithOffset(long timestamp, long offset,
long windowSize)
{
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
偏移量一般用于配置时区问题,比如北京时间和伦敦时区,早8个小时,这个时候可以设置偏移量8小时