前言
- 对于流计算来说,最核心的概念就是无穷数据集,而用来处理无穷数据集的计算就可以称为流计算。面对无穷数据集,有多种多样的处理方式,但是大致上可以分为四类:
- 1、时间无关:最基础的场景就是Filter,我们只关心我们想要的数据,这跟数据源是否是无穷的、失序都没有关系了。
- 2、近似算法:比如近似Top-N、流K-means聚类等。他们都以无穷数据为输入,并计算出差不多你想要的结果。
- 3、窗口:而窗口其实就是对无穷数据集进行分片,一种化无穷为又穷的抽象概念。
- 显然,无穷数据集有N多分片的方式,因此也就对应着N多的窗口。而其中最为引人注目的就是按时间划分的窗口了,是的,没有比时间窗口更有吸引力的划分方式了。而在时间窗口中,核心的一个概念就是时间,在流计算中一般可以分为处理时间和事件时间,当然还可以定义更多时间的概念,这完全看你自己喽。在Flink中就有这么一个东西:摄入时间。在这里,想说的是,只有事件时间才能保证正确性,程序进行回放也能保证一致性。
为什么需要WaterMark
- 对于无穷数据集,我们缺乏一种有效的方式来判断数据完整性,因此就有了WaterMark。watermark是建立在事件时间上的一个概念,用来刻画数据流的完整性。如果按照处理时间来衡量事件,一切都是有序的,完美的,自然而然也就无需watermark了。
- 换句话说事件时间引入了乱序的问题,而watermark就是用来解决乱序问题。所谓的乱序,其实就是有事件迟到了,对于迟到的元素,我们不可能无限期的等下去,必须要有一种机制来保证一个特定的时间后,必须触发window进行计算了。这个特别的机制,就是watermark,它告诉了算子时间不大于 WaterMark 的消息不应该再被接收(如果出现意味着延迟到达)。
WaterMark如何触发窗口计算
- 在Flink中,window是按照自然时间进行划分的,如果window大小是3秒,那么1分钟内会把window划分为如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
如果window大小是10秒,则window会被分为如下的形式:当然还有一个offset值可以控制window的起始值不是整点。
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
- 到EventTimeTrigger的onElement中看看:EventTimeTrigger中当ctx.getCurrentWatermark >= window.maxTimestamp时立刻触发窗口计算。
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
window.maxTimestamp = 窗口结束时间 - 1,flink时间窗口的单位为ms,也就是时间戳,也就是说就差一毫秒,也不会触发窗口。
public long maxTimestamp() {
return end - 1;
}
然后到调用Evictor的地方看看:没有内容是不会触发计算的
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(actualWindow, contents, evictingWindowState);
}
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间 >= window maxTimestamp时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window maxTimestamp决定。
waterMark,checkpoint其实都是上游节点广播消息给下游节点来处理的行为(都是在流中插入一种特殊的数据结构来做处理)
Flink中WaterMarker的类型
周期水位线(Periodic Watermark)
- 即按照固定的时间间隔周期的生成水位线。这个时间间隔可以通过
ExecutionConfig.setAutoWatermarkInterval(...)
进行设置。当然只有新生成的水位线不为空并且大于上一次生成的水位线,新水位线才会被发出。 - 生成新的水位线的逻辑完全是由用户自己定义的。最简单的水位线生成算法就是取目前为止最大的事件时间。当然这种算法比较暴力,容易水位线提升突涨(这个最大时间戳可能过大),因此该算法对乱序事件的容忍程度比较低,容易出现大量迟到事件。当然我们用的最多的是KeyedWindow,一个Window往往有多个输入,而Window算子会选择其中最小的一个。
标点水位线(Punctuated Watermark)
通过数据流中某些特殊标记事件来触发新水位线的生成。
迟到事件
虽然水位线指示着早于它的事件不应该再出现,但是在实际情况中,水位线生成算法,往往生不成完美水位线,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
-
迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:
- 1、重新激活已经关闭的窗口并重新计算以修正结果。
- 2、将迟到事件收集起来另外处理。
- 3、将迟到事件视为错误消息并丢弃。
-
Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。
- Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
- Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。