介绍
window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下。
Window
一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。
Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:
public abstract long maxTimestamp();
用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现Window时,子类应该override equals和hashCode这两个方法,以使得在逻辑上两个相等的window被认为是同一个。
GlobalWindow
GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE。
该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer。
TimeWindow
TimeWindow表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:
- start : 时间间隔的起始
- end : 时间间隔的截止
TimeWindow表示的时间间隔为[start, end)。其maxTimestamp的实现为:
public long maxTimestamp() {
return end - 1;
}
其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较start,end这两个属性。
TimeWindow也在内部实现了序列化器,该序列化器主要针对start和end两个属性。
WindowAssigner
元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:
- assignWindows :将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
- getDefaultTrigger :返回跟WindowAssigner关联的默认触发器
- getWindowSerializer :返回WindowAssigner分配的窗口的序列化器
内置的WindowAssigner
整个类型继承图如下:
下面会谈到很多基于时间的窗口,这里有两个概念,分别是时间类型和窗口类型:
时间类型:
- eventTime :用户赋予的自定义的时间戳(事件时间戳)
- processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)
窗口类型:
- Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
- Tumbling:非重叠窗口(在assignWindows方法中返回的一般都是Collections.singletonList())
GlobalWindows
该分配器对应于窗口GlobalWindow,它将所有的元素分配给同一个GlobalWindow(本质上而言,GlobalWindow也只有一个实例)。跟GlobalWindow的实现方式一样,GlobalWindows也被实现为单例模式。
方法实现:
- assignWindows :方法的实现即返回存放GlobalWindow单实例的集合对象
- getDefaultTrigger :的实现是返回一个不做任何动作的NerverTrigger
TumblingEventTimeWindows
依据给定的窗口大小,结合event-time,返回存储TimeWindow单实例的集合。getDefaultTrigger方法返回EventTimeTrigger类型的实例。
TumblingProcessingTimeWindows
依据给定窗口的大小,结合processing-time,返回存储TimeWindow单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。getDefaultTrigger方法返回的是ProcessingTimeTrigger类型的实例。
SlidingProcessingTimeWindows
Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动值,即slide。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。
在Sliding窗口中,assignWindows方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:size/slide,然后循环初始化给定的size内不同slide的窗口对象。
SlidingEventTimeWindows
类似SlidingProcessingTimeWindows只不过窗口的start参数的计算方式依赖于系统时间戳。
EventTimeSessionWindows
继承MergingWindowAssigner类
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
ProcessingTimeSessionWindows
继承MergingWindowAssigner类
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
SlidingAlignedProcessingTimeWindows
继承自BaseAlignedWindowAssigner.
简单说,就是要废弃的。
更详细的看:
*这是一个特殊的窗口分配器,用于告诉系统使用
* <i>“快速对齐处理时间窗口运算符”</ i>进行窗口化。
*
* <p>以前的Flink版本使用该操作符自动进行简单的处理时间
*窗口(翻滚和滑动),没有指定自定义触发和没有驱逐器。
*在当前的Flink版本中,该运算符仅在程序明确使用时才使用
*指定此窗口分配器。这仅适用于程序依赖的特殊情况
*快速对齐窗口操作员的性能更好,并愿意接受缺点
*支持各种功能,如下所示:
*
* <ul>
* <li>无法选择自定义状态后端,操作员始终将数据存储在Java堆上。</li>
* <li>运算符不支持键组,这意味着它不能改变并行性。</ li>
* <li> Flink的未来版本可能无法从此采取的检查点/保存点恢复
*操作员。</ li>
* </ ul>
*
* <p>未来实施计划:我们计划添加该运算符使用的一些优化
*一般窗口操作符,以便将来版本的Flink不会具有性能/功能
*权衡更多。
TumblingAlignedProcessingTimeWindows
继承自BaseAlignedWindowAssigner。
简单说,就是要废弃的。
同上。
evictors
evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素
它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前
该接口只定义了一个方法:
int evict(Iterable<StreamRecord<T>> elements, int size, W window);
接口的返回值即表示要剔除元素的个数。
内置的Evitor
Flink内置实现了三个Evitor:
- TimeEvitor
- CountEvitor
- DeltaEvitor
TimeEvitor
这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
break;
}
toEvict++;
}
return toEvict;
}
大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。
然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。
CountEvictor
基于容量的Evictor,它通过比对evict方法的第二个参数size来判断应该剔除多少个元素。具体的实现:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
if (size > maxCount) {
return (int) (size - maxCount);
} else {
return 0;
}
}
DeltaEvictor
基于给定的阈值threshold和deltaFunction来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。
Time
Flink中仅有一个类Time来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个Time对象,需要两个参数:
- size : 时间间隔的大小(数值)
- unit : TimeUnit的实例,表示时间间隔的单位
该类提供的很多静态方法提供对不同unit的设置。
Trigger
Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。
以粗粒度来看,Flink主要提供了三种形式的触发方式:
- 按元素
- 按系统时间
- 按事件时间
这体现为Trigger的三个主要的抽象方法:
- onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的CountTrigger
- onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
- onEventTime :被event-time(事件时间戳)定时器触发
以上这些方法中都有一个共同的参数:TriggerContext。
TriggerContext
顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger的内部接口:
- getCurrentWatermark :返回当前的watermark
- registerProcessingTimeTimer :注册一个系统时间的定时器,触发onProcessingTime
- registerEventTimeTimer :注册一个事件时间的定时器,触发onEventTime
- deleteProcessingTimeTimer :删除系统时间的定时器
- deleteEventTimeTimer :删除事件时间的定时器
- getPartitionedState :用于失败恢复的获取状态的接口
其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法getKeyValueState也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。
TrigerResult
Trigger中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过TriggerResult来表达的。它是一个枚举类型,有这么几个枚举值:
- FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
- PURGE :清除window中的元素
- FIRE_AND_PURGE :同时具备FIRE和PURGE两种属性产生的行为
- CONTINUE :不做任何操作
内置的Trigger
Flink内置实现了很多触发器,完整的类图如下:
[图片上传失败...(image-4f804c-1511100517497)]
这些触发器都具有一些共性,这里一并说明:
- 由于Flink在Trigger中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回TriggerResult.CONTINUE(这种设计方式有欠妥当,因为不利于扩展)
- 因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过TriggerContext的getPartitionedState方法进行存取
EventTimeTrigger
基于事件时间的触发器,对应onEventTime
ProcessingTimeTrigger
基于当前系统时间的触发器,对应onProcessingTime
ContinuousEventTimeTrigger
该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first状态标识为false。具体实现如下:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
if (first.value()) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
first.update(false);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
持续的触发依赖于在onEventTime中不断注册下一次触发的定时器:
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
ContinuousProcessingTimeTrigger
基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp来判断是否需要触发,不过它的循环注册过程是在onElement中。
CountTrigger
基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement中,逻辑很简单,先累加如果大于给定的阈值则触发:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
PurgingTrigger
该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult的实例,进行相应的判断,最后返回FIRE_AND_PURGE枚举值。
DeltaTrigger
基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在onElement中。