Flink - 当数据流入window时,会发生什么

上篇文章对Flink中常见的windowAssigner,如:TumblingEventTimeWindow, SlidingEventTimeWindow, EventTimeSessionWindows 等中的默认提供的trigger:EventTimeTrigger进行了剖析,讨论了trigger注册的回调函数的执行,trigger会触发怎样的triggerResult,如何处理迟到数据的trigger触发,以及提醒了需要注意的几点,明白flink中的EventTimeTrigger为什么这么写。

这篇文章就讨论下当数据流入window时,会发生什么。我们着重分析的类就是上篇文章中提到的WindowOperator类。
在讨论数据流入window会触发的一系列动作之前,我们需要明确一个window操作包括哪些部分。

  • window assigner 指明数据流中的数据属于哪个window
  • trigger 指明在哪些条件下触发window计算,基于处理数据时的时间以及事件的特定属性、
  • evictor 可选组件,在window执行计算前或后,将window中的数据移除,如使用globalWindow时,由于该window的默认trigger为永不触发,所以既需要实现自定义trigger,也需要实现evictor,移除部分已经计算完毕的数据。
  • window process flink默认提供的有 ReduceFunction,AggragateFunction.还可以自定义实现 windowProcessFunction。作用便是当trigger返回FIRE结果时,计算window的结果。

这篇文章,先不讨论window的early emit,即window在触发完整的计算之前,为减小延迟而进行的提前计算。不过通过上面的四个组件,也可以想明白,只要自定义的trigger定时或定量或根据某条件在window触发完整计算之前,产生FIRE结果,便可使用window process先行根据目前window中的不完整数据,提前计算一个结果。

以下例子中,我们使用以下前提:

  • 使用的window assigner实现的是 WindowAssigner 而不是 MergingWindowAssigner
  • 同时我们分析的 event-time 语义
  • window操作作用在 KeyedStream 上

接下来要分析的 WindowOperator 类可以看做是一个调度器,它持有window操作相关的所有组件,不包括evctor,因为含有evctor组件的window操作被封装为 EvictingWindowOperator 。 WindowOperator 定义了
window assigner, trigger, windowProcessFunction 的执行顺序如何,它们之间的执行逻辑等。

WindowOperator 类实现了 Triggerable 接口,为什么要实现这个接口呢?这是为了方便为 window 指派 window 过期时的回调函数,因此 WindowOperater 类中实现了 onEventTime 与 onProcessTime 两个方法,分别对应不用语义下 window 过期时的回调函数的执行逻辑,即:当flink 决定删除 window 时,都做了什么操作,删除了哪些东西。

WindowOperator 类也实现了 OneInputStreamOperator 接口,实现了其 processElement 方法,当新数据流入时,调用该方法。

在真正分析代码前,有必要先说明下 WindowOperator 中的几个内部类:

  • WindowContext 每个window都有自己的context,持有该window与window的state。在 WindowOperator 类中,该类对应 processContext 成员。
  • Context 实现了 Trigger.OnMergeContext 接口。作为一个处理window中trigger的公共类,该类中持有key与window两个成员,方便根据key与window处理特定的trigger。在 WindowOperator 类中,该类对应 triggerContext 成员。

另外,在 WindowOperator 中有一个 windowState 成员,以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是通过稍后的代码可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。

有了上面的基本认识,下面分析,当数据流入window时,发生了什么。

首先,根据刚刚所说,每一个流入的数据都会调用 processElement 方法:

public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {

WindowOperator 类,首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。
第二句的 isSkippedElement 变量,在我们的前提下,没有作用。
第三句获取当前数据所在的KeyedStream的那个key上。这个key在稍后的 triggerContext 成员中会用到。
再下面的if语句不会进入,在我们前提中,我们进入的是else语句:

} else {
            for (W window: elementWindows) {
                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }

                registerCleanupTimer(window);
            }
        }

在 else 语句中,对该流入数据所分配到的每个window执行以下逻辑:

  1. 判断该window是否已过期。判断条件如下:
protected boolean isWindowLate(W window) {
        return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

通过该判断可以看出,flink提供了 allowedLateness 变量用于指明允许迟到数据最多可以迟到多久,因此,window的过期时间不仅仅是其 maxTimestamp, 还需要加上等待迟到数据的时间。

  1. 获取该window的context,将数据加入。
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());

这里需要指出,flink提供了两种 window process:

  • Incremental Aggregation Functions。ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。
  • ProcessWindowFunction。用户实现 ProcessWindowFunction 的自定义处理逻辑,特点是保存了 window 的所有数据,只有触发了 trigger 后才可以执行计算。

因此这里 windowState 根据 window 获取到的 state 是不同的。针对第一种情况,返回的是 HeapReducingState, HeapAggregatingState ,当执行到 windowState.add(element.getValue());语句时,便直接得出结果。而第二种情况,返回的是 HeapListState ,当执行到 windowState.add(element.getValue());语句时,仅仅是将数据加入到list中。

  1. 继续往下走,获取该key下该 window 的trigger,并执行trigger的 onElement 方法,来确定需不需要触发计算。
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);

根据上篇的解释可知,在默认trigger下,仅当流入的是迟到数据才会在 onElement 中触发trigger。
因此,这里大家就可以实现自己的trigger,根据流入的每一个数据,判断是否需要触发trigger,达到提前触发计算的目的。

  1. 根据trigger的结果,执行不同的逻辑
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
  • FIRE: 代表触发window的计算。首先从 windowState 中获取内容。由刚刚的分析知道,在 Incremental Aggregation Functions 情况下,返回的是一个常量 : 计算结果。在 ProcessWindowFunction 情况下,返回的是当前window中的数据,一个list的iterator对象。然后执行 emitWindowContents(window, contents); 语句
private void emitWindowContents(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
    }

该方法会调用用户实现的计算逻辑(ProcessWindowFunction实现类),将流入的数据 contents 经过计算,得到结果后写入 timestampedCollector。

  • PURGE: 代表需要清除window。这里就是执行 windowState.clear(); 语句。结果便是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)清除,即:该window的状态被清除。但是此时window对象还未删除,相关的trigger中的自定义状态与 ProcessWindowFunction 中的状态还未删除。
  1. 最后,为该window注册失效后的回调函数,在window失效后,删除window并做其他收尾工作。
registerCleanupTimer(window);

前面说过了, WindowOperator 实现了 Triggerable 接口,且有 triggerContext 获取当前正在处理的window的trigger来注册回调函数,registerCleanupTimer(window)方法如下:

protected void registerCleanupTimer(W window) {
        long cleanupTime = cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            // don't set a GC timer for "end of time"
            return;
        }

        if (windowAssigner.isEventTime()) {
            triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

public void registerEventTimeTimer(long time) {
            internalTimerService.registerEventTimeTimer(window, time);
        }

通过上面的两个方法可以看出,这里的回调函数并不是注册在当前window的trigger中,而是注册在 WindowOperator 内部持有的一个 internalTimerService 中。

那该window是在何时才会失效呢?

private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

window 在 watermark 的时间戳大于 maxTimestamp + allowedLateness 时,才会过期,这便是 flink 提供的除了 watermark 外的另一种处理迟到数据的机制。

我们再看看,window过期后,回调函数是怎么处理的。

public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        ACC contents = null;
        if (windowState != null) {
            contents = windowState.get();
        }

        if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }

上面与 MergingWindowAssigner 相关的分支我们不进入分析。
为了方便分析,我们假设 window 的过期时间 maxTimestamp + allowedLateness = 2000 + 1500 = 3500。 当 watermark 的时间戳大于 3500 时,便会触发该回调函数,为了说明普遍性,假设 watermark 的时间戳为 4000。

将与 MergingWindowAssigner 无关的语句去掉后,该方法的前面部分如下:

triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;

ACC contents = null;
if (windowState != null) {
    contents = windowState.get();
}

首先,将 triggerContext 根据key与window找到特定的trigger,同样 windowState 根据window找到特定的window中的context,该context中存储的是window的计算结果(Incremental Aggregation Functions 情况下)或者缓存的数据(ProcessWindwoFunction 情况下)。
接下来:

       if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

以上代码说明了当window过期,触发过期的回到函数时,都会做哪些操作。
可以看到,会先到该window的trigger中执行 onEventTme 方法。此时的 timer.getTimestamp() 的值为 3500。而一般我们自定义的trigger中一般不会注册一个时间为 maxTimestamp + allowedLateness 的回调函数。以flink的默认trigger - EventTimeTrigger 为例,其注册的回调函数最大时间便是 maxTimestamp 。因此,除非用户设置的 allowedLateness 为0,且在trigger中注册了时间为 maxTmestamp 的回调函数,否则此处不会有triggerResult。

假设此处确实有对应的回调函数且被执行,下面的两个if条件的逻辑与上面分析 processElement 时一样,不再赘述。

再往下走,调用了 clearAllState 方法,进入该方法:

private void clearAllState(W window,AppendingState<IN, ACC> windowState,MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        triggerContext.clear();
        processContext.window = window;
        processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
}
  • windowState.clear() 将window中暂存的结果或原始数据删除。
  • triggerContext.clear() 调用该window的trigger的clear()方法,删除用户自定义trigger中的自定义状态,同时删除trigger的timer。需要用户在实现自定义trigger且使用自定义状态时,实现该方法,方便此时调用清除状态,避免内存问题。
  • processContext.clear(); 调用用户实现自定义逻辑的,ProcessWindwoFuncton接口实现类的clear()方法,目的同上。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容