stream-window

介绍

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析。本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下。

Window

一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达。
Flink的根窗口对象是一个抽象类,只提供了一个抽象方法:

public abstract long maxTimestamp();

用于获取最大的时间戳。Flink提供了两个窗口的具体实现。在实现Window时,子类应该override equals和hashCode这两个方法,以使得在逻辑上两个相等的window被认为是同一个。

GlobalWindow

GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE。
该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer。

TimeWindow

TimeWindow表示一个时间间隔窗口,这体现在其构造器需要注入的两个属性:

  • start : 时间间隔的起始
  • end : 时间间隔的截止

TimeWindow表示的时间间隔为[start, end)。其maxTimestamp的实现为:

public long maxTimestamp() {
return end - 1;
}

其equals的实现中,除了常规比较(比较引用,比较Class的实例),还会比较start,end这两个属性。

TimeWindow也在内部实现了序列化器,该序列化器主要针对start和end两个属性。

WindowAssigner

元素的窗口分配器。用于将元素分配给一个或者多个窗口。该抽象类定义了三个抽象方法:

  • assignWindows :将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
  • getDefaultTrigger :返回跟WindowAssigner关联的默认触发器
  • getWindowSerializer :返回WindowAssigner分配的窗口的序列化器

内置的WindowAssigner

整个类型继承图如下:


下面会谈到很多基于时间的窗口,这里有两个概念,分别是时间类型和窗口类型:

时间类型:

  • eventTime :用户赋予的自定义的时间戳(事件时间戳)
  • processingTime : 执行当前task的subtask主机的本地时间戳(系统时间戳)

窗口类型:

  • Sliding:滑动窗口,可能会重叠(某个元素可能会身处多个窗口中)
  • Tumbling:非重叠窗口(在assignWindows方法中返回的一般都是Collections.singletonList())

GlobalWindows

该分配器对应于窗口GlobalWindow,它将所有的元素分配给同一个GlobalWindow(本质上而言,GlobalWindow也只有一个实例)。跟GlobalWindow的实现方式一样,GlobalWindows也被实现为单例模式。

方法实现:

  • assignWindows :方法的实现即返回存放GlobalWindow单实例的集合对象
  • getDefaultTrigger :的实现是返回一个不做任何动作的NerverTrigger

TumblingEventTimeWindows

依据给定的窗口大小,结合event-time,返回存储TimeWindow单实例的集合。getDefaultTrigger方法返回EventTimeTrigger类型的实例。

TumblingProcessingTimeWindows

依据给定窗口的大小,结合processing-time,返回存储TimeWindow单实例的集合。需要注意的是,这里依据的是运行当前任务所在主机的本地时间戳。getDefaultTrigger方法返回的是ProcessingTimeTrigger类型的实例。

SlidingProcessingTimeWindows

Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小,还要指定一个滑动值,即slide。所谓的滑动窗口可以这么理解,比如:一分钟里每十秒钟。这里一分钟是窗口大小,每十秒即为滑动值。

在Sliding窗口中,assignWindows方法返回的就不再是单个窗口了,而是窗口的集合。首先计算出窗口的个数:size/slide,然后循环初始化给定的size内不同slide的窗口对象。

SlidingEventTimeWindows

类似SlidingProcessingTimeWindows只不过窗口的start参数的计算方式依赖于系统时间戳。

EventTimeSessionWindows

继承MergingWindowAssigner类

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}

ProcessingTimeSessionWindows

继承MergingWindowAssigner类

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}

SlidingAlignedProcessingTimeWindows

继承自BaseAlignedWindowAssigner.

简单说,就是要废弃的。

更详细的看:

*这是一个特殊的窗口分配器,用于告诉系统使用
 * <i>“快速对齐处理时间窗口运算符”</ i>进行窗口化。
 *
 * <p>以前的Flink版本使用该操作符自动进行简单的处理时间
 *窗口(翻滚和滑动),没有指定自定义触发和没有驱逐器。
 *在当前的Flink版本中,该运算符仅在程序明确使用时才使用
 *指定此窗口分配器。这仅适用于程序依赖的特殊情况
 *快速对齐窗口操作员的性能更好,并愿意接受缺点
 *支持各种功能,如下所示:
 *
 * <ul>
 * <li>无法选择自定义状态后端,操作员始终将数据存储在Java堆上。</li>
 * <li>运算符不支持键组,这意味着它不能改变并行性。</ li>
 * <li> Flink的未来版本可能无法从此采取的检查点/保存点恢复
 *操作员。</ li>
 * </ ul>
 *
 * <p>未来实施计划:我们计划添加该运算符使用的一些优化
 *一般窗口操作符,以便将来版本的Flink不会具有性能/功能
 *权衡更多。

TumblingAlignedProcessingTimeWindows

继承自BaseAlignedWindowAssigner。

简单说,就是要废弃的。

同上。

evictors

evitor : 中文译为驱逐者;顾名思义其用于剔除窗口中的某些元素
它剔除元素的时机是:在触发器触发之后,在窗口被处理(apply windowFunction)之前
该接口只定义了一个方法:

int evict(Iterable<StreamRecord<T>> elements, int size, W window);

接口的返回值即表示要剔除元素的个数。

内置的Evitor

Flink内置实现了三个Evitor:

  • TimeEvitor
  • CountEvitor
  • DeltaEvitor

TimeEvitor

这个Evitor基于给定的保留时间(keep time)作为剔除规则,大致的实现如下:


public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
break;
}
toEvict++;
}
return toEvict;
}

大致的逻辑是,先取出最后一个元素的时间戳作为“当前”时间,然后减去期望中的“窗口大小”,得到一个基准时间戳(只需要比基准时间戳大的元素)。
然后从第一个元素开始循环比较每一个元素,如果比基准时间戳小,则累加剔除统计数,一旦发现某个元素的时间戳大于基准时间戳,则直接跳出循环,不再累加了(因为本地窗口中元素是基于时间有序的,这一点由Flink运行时来保证,如果从某个元素开始其时间戳大于基准时间戳,则后续的所有元素都满足这一条件,因此也就没必要再循环下去了)。

CountEvictor

基于容量的Evictor,它通过比对evict方法的第二个参数size来判断应该剔除多少个元素。具体的实现:

public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
if (size > maxCount) {
return (int) (size - maxCount);
} else {
return 0;
}
}

DeltaEvictor

基于给定的阈值threshold和deltaFunction来进行判断。也是拿当前元素跟最后一个元素一起计算delta跟阈值做对比。

Time

Flink中仅有一个类Time来定义窗口的时间间隔。该时间默认指执行环境下的时间。创建一个Time对象,需要两个参数:

  • size : 时间间隔的大小(数值)
  • unit : TimeUnit的实例,表示时间间隔的单位
    该类提供的很多静态方法提供对不同unit的设置。

Trigger

Trigger(触发器)用于决定某个窗口的元素集合什么时候触发计算以及结果什么时候被emit。

以粗粒度来看,Flink主要提供了三种形式的触发方式:

  • 按元素
  • 按系统时间
  • 按事件时间

这体现为Trigger的三个主要的抽象方法:

  • onElement :针对每个元素触发,这主要针对于那些基于元素的触发器,比如后面我们将看到的CountTrigger
  • onProcessingTime :被processing-time(Flink系统时间时间戳)定时器触发
  • onEventTime :被event-time(事件时间戳)定时器触发
    以上这些方法中都有一个共同的参数:TriggerContext。

TriggerContext

顾名思义,它提供触发器执行时的上下文信息,但它只是Trigger的内部接口:

  • getCurrentWatermark :返回当前的watermark
  • registerProcessingTimeTimer :注册一个系统时间的定时器,触发onProcessingTime
  • registerEventTimeTimer :注册一个事件时间的定时器,触发onEventTime
  • deleteProcessingTimeTimer :删除系统时间的定时器
  • deleteEventTimeTimer :删除事件时间的定时器
  • getPartitionedState :用于失败恢复的获取状态的接口

其中,registerXXX/deleteXXX模式对主要针对上面两种基于时间的触发器。而最后一个方法getKeyValueState也是非常重要的,因为它用于获取窗口相关的状态,比如后面谈到的一些触发器是依赖于一些上下文状态的,那些状态的获取就是依靠这个方法。

TrigerResult

Trigger中定义的三个触发方法被调用后,最终要返回一个结果以决定触发之后产生的行为(比如是调用window function还是将窗口丢弃),这个定义触发器触发结果行为是通过TriggerResult来表达的。它是一个枚举类型,有这么几个枚举值:

  • FIRE :window将会被应用window Function进行计算,然后将结果emit出去,但元素并没有被清洗,仍然在window中
  • PURGE :清除window中的元素
  • FIRE_AND_PURGE :同时具备FIRE和PURGE两种属性产生的行为
  • CONTINUE :不做任何操作

内置的Trigger

Flink内置实现了很多触发器,完整的类图如下:
[图片上传失败...(image-4f804c-1511100517497)]

这些触发器都具有一些共性,这里一并说明:

  • 由于Flink在Trigger中已事先将各种触发器类型的回调封装为不同的方法(onXXX),所以后续各种不同的触发器类型的核心逻辑将主要在其特定相关的onXXX方法中,而无关的onXXX方法将直接返回TriggerResult.CONTINUE(这种设计方式有欠妥当,因为不利于扩展
  • 因为有不少触发类型依赖于上下文的某些状态值(比如下文典型的ContinuousXXXTrigger),这些状态值将通过TriggerContext的getPartitionedState方法进行存取

EventTimeTrigger

基于事件时间的触发器,对应onEventTime

ProcessingTimeTrigger

基于当前系统时间的触发器,对应onProcessingTime

ContinuousEventTimeTrigger

该触发器是基于事件时间的按照指定时间间隔持续触发的触发器,它的首次触发取决于Watermark。首次触发的判断位于onElement中,它注册下一次(也是首次)触发eventTime 定时器的时机,然后将其first状态标识为false。具体实现如下:

public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
if (first.value()) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
first.update(false);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}

持续的触发依赖于在onEventTime中不断注册下一次触发的定时器:

public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}

ContinuousProcessingTimeTrigger

基于系统时间的按照指定时间间隔持续触发的触发器,它也是基于保存的状态值fire-timestamp来判断是否需要触发,不过它的循环注册过程是在onElement中。
CountTrigger
基于一个给定的累加值触发,由于累加值不是基于时间而是基于元素的,所有其触发机制实现在onElement中,逻辑很简单,先累加如果大于给定的阈值则触发:

public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}

PurgingTrigger

该触发器类似于一个包装器,用于将任何给定的触发器转变成purging触发器。它的实现机制是,它接收一个trigger实例,然后在各个onXXX回调上执行该实例的相应的onXXX并获得TriggerResult的实例,进行相应的判断,最后返回FIRE_AND_PURGE枚举值。

DeltaTrigger

基于DeltaFunction和一个给定的阈值触发,该触发器在最后一个到达元素和当前元素之间计算一个delta值跟给定的阈值比较,如果高于给定的阈值,则触发。因为是基于元素的,所以主要逻辑实现在onElement中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • Flink的 Window 操作 Window是无限数据流处理的核心,Window将一个无限的stream拆分成有...
    写Bug的张小天阅读 55,498评论 4 54
  • 介绍 Window也即窗口,是Flink流处理的特性之一。前一篇文章我们谈到了Winodw的相关概念及其实现。窗口...
    苗栋栋阅读 1,078评论 0 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 问答题47 /72 常见浏览器兼容性问题与解决方案? 参考答案 (1)浏览器兼容问题一:不同浏览器的标签默认的外补...
    _Yfling阅读 13,737评论 1 92
  • 寿教育管理"领航"人才影子跟岗培训 简 报 第4期 仁寿教育体育局“领航”人才1班 2017 年11月...
    大化幼儿园阅读 166评论 0 0