使用EventTime类型的窗口时,由于分布式架构的限制,即使数据能按EventTime顺序进入Flink,但数据在Flink中通常会分布到多个分区上,导致EventTime全局无序,因此某个EventTime无法直接表明在此EventTime前的数据已经全部到达
watermark描述
使用watermark来标记数据EventTime的水位线,流数据通常分布到多个分区并行任务中处理,所以每个并行任务中都会有各自的watermark,Flink根据watermark来决定关闭窗口的时机,如对于[0,10)的时间窗口,当watermark大于等于10时就会关闭该窗口
watermark计算方式
在一个并行任务中,根据当前的EventTime来确定watermark。默认情况下,是直接把EventTime赋值到watermark,如当前数据中的EventTime为t,则标记watermark也为t,表示t之前的数据已经全部到达。如果发现新的watermark没有大于旧值,则新生成的watermark取值为旧值。==curWatermark = max(curEventTime, preWatermark)==
但是即使在同一个并行任务中,数据也不一定会按EventTime先后顺序进来,如果直接根据EventTime来赋值为watermark,此时Flink判断关闭当前窗口的话,会导致晚到的数据丢失。
为了解决此问题,flink允许在设置窗口时声明一个watermark延迟计算时间t,即watermar=EventTime-t。t值需要自行按实际延迟情况进行预估,以让Flink等待更长时间再来关闭窗口,尽量使对应数据都能进到对应窗口中。如对于[0,10)的时间窗口,假设预设延迟t为3,当EventTime等于时10时,watermark=10-3=7,此时仍然不会关闭该窗口,直到接收到EventTime大于等于13的数据时,watermark才大于等于10,此时才会关闭该窗口。==curWatermark = max(curEventTime-t, preWatermark)==
watermark生成频率
watermark生成有两种频率,周期生成和与数据逐个对应生成,周期生成是指间隔一段固定时间生成一个watermark,与数据逐个对应生成是指每接收到一个数据生成一个watermark。周期生成适合于数据密集的场景,性能较好,但是有可能会导致watermark生成不及时导致窗口延迟关闭;逐个对应生成适合数据稀疏的场景,性能较差,但是watermark生成及时。
watermark在算子之间的传递
- 无shuffle算子间的传递:watermark描述的是一个算子的每个并行任务的EventTime水位线,不同算子的并行任务中的EventTime水位线自然也各不相同,但上下游的算子之间对于同个分区的数据会通过broadcast进行watermark传递,除此之外,每个算子有一个全局的watermark,其取值为所有并行任务的最小watermark。
如算子A为算子B的上游算子,算子AB都有3个并行任务且一一对应,假设算子A的任务1中watermark为1,任务2的watermark为2,任务3的watermark为3,则算子A的整体watermark为1。当算子A的任务1处理完watermark为1前的数据时,此waterwark传递到算子B的任务1,算子B的任务1的watermark为1,此时算子B的任务2和任务3还没有接收到上游数据,算子B的整体watermar也为1。 - 有shuffle算子之间的传递:下游每个并行子任务的watermark等于上游多个task之中传递过来的最小的watermark
如算子A的task_A_1和task_A_2均发送数据到算子B的task_B_1,且task_A_1的watermark为3,task_A_2的watermark为4,则task_B_1的watermark为3
使用watermark的窗口开启和关闭时机
- 窗口的开启和关闭以算子为粒度,窗口的开启和关闭也以算子整体waterwark来判断
- 算子任一子任务接收到数据时,算子就开启窗口,第一个窗口的左区间时间计算方式为firstEventTime - (firstEventTime - offset + windowSize) % windowSize,其中firstEventTime表示第一个数据的EventTime,offset表示窗口设置的偏移量(offset=5s表示延后5s开启窗口),windowSize表示窗口的时间长度;在滚动窗口中,右区间的值为左区间 + 窗口长度;在滑动窗口中,右区间的值为第一次将要滑动后的左区间值;左区间为开区间,右区间为闭区间
- 算子的整体watermark等于所有子任务的最小watermark,当算子的watermark大于等于窗口的右区间时窗口就会触发关闭并且输出窗口的统计结果。
- 如果watermark设置了最大乱序时间,则在右区间的基础上加上该最大乱序时间才会触发关闭窗口输出窗口统计结果
- 如果窗口设置了允许数据迟到最大时间,窗口还是会按原来的时间输出统计结果,但会等到原来时间的基础上加上允许数据迟到最大时间才会触发关闭窗口,并且在原来时间输出了第一次的统计结果后,在允许数据迟到的时间内,每接收到一条数据,就会输出一次窗口统计结果
- 如果设置了迟到数据输出到侧输出流,则迟到数据在窗口关闭后会输出到侧输出流