Flink - 当数据流入window时,会发生什么

上篇文章对Flink中常见的windowAssigner,如:TumblingEventTimeWindow, SlidingEventTimeWindow, EventTimeSessionWindows 等中的默认提供的trigger:EventTimeTrigger进行了剖析,讨论了trigger注册的回调函数的执行,trigger会触发怎样的triggerResult,如何处理迟到数据的trigger触发,以及提醒了需要注意的几点,明白flink中的EventTimeTrigger为什么这么写。

这篇文章就讨论下当数据流入window时,会发生什么。我们着重分析的类就是上篇文章中提到的WindowOperator类。
在讨论数据流入window会触发的一系列动作之前,我们需要明确一个window操作包括哪些部分。

  • window assigner 指明数据流中的数据属于哪个window
  • trigger 指明在哪些条件下触发window计算,基于处理数据时的时间以及事件的特定属性、
  • evictor 可选组件,在window执行计算前或后,将window中的数据移除,如使用globalWindow时,由于该window的默认trigger为永不触发,所以既需要实现自定义trigger,也需要实现evictor,移除部分已经计算完毕的数据。
  • window process flink默认提供的有 ReduceFunction,AggragateFunction.还可以自定义实现 windowProcessFunction。作用便是当trigger返回FIRE结果时,计算window的结果。

这篇文章,先不讨论window的early emit,即window在触发完整的计算之前,为减小延迟而进行的提前计算。不过通过上面的四个组件,也可以想明白,只要自定义的trigger定时或定量或根据某条件在window触发完整计算之前,产生FIRE结果,便可使用window process先行根据目前window中的不完整数据,提前计算一个结果。

以下例子中,我们使用以下前提:

  • 使用的window assigner实现的是 WindowAssigner 而不是 MergingWindowAssigner
  • 同时我们分析的 event-time 语义
  • window操作作用在 KeyedStream 上

接下来要分析的 WindowOperator 类可以看做是一个调度器,它持有window操作相关的所有组件,不包括evctor,因为含有evctor组件的window操作被封装为 EvictingWindowOperator 。 WindowOperator 定义了
window assigner, trigger, windowProcessFunction 的执行顺序如何,它们之间的执行逻辑等。

WindowOperator 类实现了 Triggerable 接口,为什么要实现这个接口呢?这是为了方便为 window 指派 window 过期时的回调函数,因此 WindowOperater 类中实现了 onEventTime 与 onProcessTime 两个方法,分别对应不用语义下 window 过期时的回调函数的执行逻辑,即:当flink 决定删除 window 时,都做了什么操作,删除了哪些东西。

WindowOperator 类也实现了 OneInputStreamOperator 接口,实现了其 processElement 方法,当新数据流入时,调用该方法。

在真正分析代码前,有必要先说明下 WindowOperator 中的几个内部类:

  • WindowContext 每个window都有自己的context,持有该window与window的state。在 WindowOperator 类中,该类对应 processContext 成员。
  • Context 实现了 Trigger.OnMergeContext 接口。作为一个处理window中trigger的公共类,该类中持有key与window两个成员,方便根据key与window处理特定的trigger。在 WindowOperator 类中,该类对应 triggerContext 成员。

另外,在 WindowOperator 中有一个 windowState 成员,以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是通过稍后的代码可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。

有了上面的基本认识,下面分析,当数据流入window时,发生了什么。

首先,根据刚刚所说,每一个流入的数据都会调用 processElement 方法:

public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {

WindowOperator 类,首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。
第二句的 isSkippedElement 变量,在我们的前提下,没有作用。
第三句获取当前数据所在的KeyedStream的那个key上。这个key在稍后的 triggerContext 成员中会用到。
再下面的if语句不会进入,在我们前提中,我们进入的是else语句:

} else {
            for (W window: elementWindows) {
                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }

                registerCleanupTimer(window);
            }
        }

在 else 语句中,对该流入数据所分配到的每个window执行以下逻辑:

  1. 判断该window是否已过期。判断条件如下:
protected boolean isWindowLate(W window) {
        return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

通过该判断可以看出,flink提供了 allowedLateness 变量用于指明允许迟到数据最多可以迟到多久,因此,window的过期时间不仅仅是其 maxTimestamp, 还需要加上等待迟到数据的时间。

  1. 获取该window的context,将数据加入。
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());

这里需要指出,flink提供了两种 window process:

  • Incremental Aggregation Functions。ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。
  • ProcessWindowFunction。用户实现 ProcessWindowFunction 的自定义处理逻辑,特点是保存了 window 的所有数据,只有触发了 trigger 后才可以执行计算。

因此这里 windowState 根据 window 获取到的 state 是不同的。针对第一种情况,返回的是 HeapReducingState, HeapAggregatingState ,当执行到 windowState.add(element.getValue());语句时,便直接得出结果。而第二种情况,返回的是 HeapListState ,当执行到 windowState.add(element.getValue());语句时,仅仅是将数据加入到list中。

  1. 继续往下走,获取该key下该 window 的trigger,并执行trigger的 onElement 方法,来确定需不需要触发计算。
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);

根据上篇的解释可知,在默认trigger下,仅当流入的是迟到数据才会在 onElement 中触发trigger。
因此,这里大家就可以实现自己的trigger,根据流入的每一个数据,判断是否需要触发trigger,达到提前触发计算的目的。

  1. 根据trigger的结果,执行不同的逻辑
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
  • FIRE: 代表触发window的计算。首先从 windowState 中获取内容。由刚刚的分析知道,在 Incremental Aggregation Functions 情况下,返回的是一个常量 : 计算结果。在 ProcessWindowFunction 情况下,返回的是当前window中的数据,一个list的iterator对象。然后执行 emitWindowContents(window, contents); 语句
private void emitWindowContents(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
    }

该方法会调用用户实现的计算逻辑(ProcessWindowFunction实现类),将流入的数据 contents 经过计算,得到结果后写入 timestampedCollector。

  • PURGE: 代表需要清除window。这里就是执行 windowState.clear(); 语句。结果便是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)清除,即:该window的状态被清除。但是此时window对象还未删除,相关的trigger中的自定义状态与 ProcessWindowFunction 中的状态还未删除。
  1. 最后,为该window注册失效后的回调函数,在window失效后,删除window并做其他收尾工作。
registerCleanupTimer(window);

前面说过了, WindowOperator 实现了 Triggerable 接口,且有 triggerContext 获取当前正在处理的window的trigger来注册回调函数,registerCleanupTimer(window)方法如下:

protected void registerCleanupTimer(W window) {
        long cleanupTime = cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            // don't set a GC timer for "end of time"
            return;
        }

        if (windowAssigner.isEventTime()) {
            triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

public void registerEventTimeTimer(long time) {
            internalTimerService.registerEventTimeTimer(window, time);
        }

通过上面的两个方法可以看出,这里的回调函数并不是注册在当前window的trigger中,而是注册在 WindowOperator 内部持有的一个 internalTimerService 中。

那该window是在何时才会失效呢?

private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

window 在 watermark 的时间戳大于 maxTimestamp + allowedLateness 时,才会过期,这便是 flink 提供的除了 watermark 外的另一种处理迟到数据的机制。

我们再看看,window过期后,回调函数是怎么处理的。

public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        ACC contents = null;
        if (windowState != null) {
            contents = windowState.get();
        }

        if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }

上面与 MergingWindowAssigner 相关的分支我们不进入分析。
为了方便分析,我们假设 window 的过期时间 maxTimestamp + allowedLateness = 2000 + 1500 = 3500。 当 watermark 的时间戳大于 3500 时,便会触发该回调函数,为了说明普遍性,假设 watermark 的时间戳为 4000。

将与 MergingWindowAssigner 无关的语句去掉后,该方法的前面部分如下:

triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;

ACC contents = null;
if (windowState != null) {
    contents = windowState.get();
}

首先,将 triggerContext 根据key与window找到特定的trigger,同样 windowState 根据window找到特定的window中的context,该context中存储的是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)。
接下来:

       if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

以上代码说明了当window过期,触发过期的回到函数时,都会做哪些操作。
可以看到,会先到该window的trigger中执行 onEventTme 方法。此时的 timer.getTimestamp() 的值为 3500。而一般我们自定义的trigger中一般不会注册一个时间为 maxTimestamp + allowedLateness 的回调函数。以flink的默认trigger - EventTimeTrigger 为例,其注册的回调函数最大时间便是 maxTimestamp 。因此,除非用户设置的 allowedLateness 为0,且在trigger中注册了时间为 maxTmestamp 的回调函数,否则此处不会有triggerResult。

假设此处确实有对应的回调函数且被执行,下面的两个if条件的逻辑与上面分析 processElement 时一样,不再赘述。

再往下走,调用了 clearAllState 方法,进入该方法:

private void clearAllState(W window,AppendingState<IN, ACC> windowState,MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        triggerContext.clear();
        processContext.window = window;
        processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
}
  • windowState.clear() 将window中暂存的结果或原始数据删除。
  • triggerContext.clear() 调用该window的trigger的clear()方法,删除用户自定义trigger中的自定义状态,同时删除trigger的timer。需要用户在实现自定义trigger且使用自定义状态时,实现该方法,方便此时调用清除状态,避免内存问题。
  • processContext.clear(); 调用用户实现自定义逻辑的,ProcessWindwoFuncton接口实现类的clear()方法,目的同上。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容