Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
WindowOperator
WindowOperator负责window中元素存储和计算流程。
WindowOperator包含如下几个重要方法:
- open:operator初始化的逻辑
- processElement: 新元素进入window的时候调用
- onEventTime: event time计算触发时候的逻辑
- onProcessingTime:processing time计算触发时候的逻辑
open方法
open方法的调用栈如图所示:
由图可知,open方法在Task Manager中的Task -> StreamTask的openAllOperator得到调用。
在分析open方法之前,我们需要先分析下WindowOperator类的继承结构。
WindowOperator继承自AbstractUdfStreamOperator,AbstractStreamOperator继承自AbstractStreamOperator。
AbstractStreamOperator
的open方法:
/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
*
* <p>The default implementation does nothing.
*
* @throws Exception An exception in this method causes the operator to fail.
*/
@Override
public void open() throws Exception {}
根据英文注释,可以总结open方法的功能如下:
- open方法在operator处理任何元素之前调用
- open方法负责operator的初始化工作,例如状态初始化
- 如果该方法抛出异常,operator会失败,无法进行后续处理数据的流程
AbstractUdfStreamOperator的open代码如下所示:
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
FunctionUtils这一行对userFunction进行了处理,稍后分析。userFunction为用户自定义处理方法的封装。比如apply或者process函数中传入的自定义处理逻辑,会被封装为userFunction。
FunctionUtils.openFunction方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
如果userFunction使用的是RichFunction类型的话,会调用richFunction的open方法。
接下来分析下WindowOperator的open方法。代码如下:
public void open() throws Exception {
super.open();
// 打开迟到被丢弃数据条数统计的监控
this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
// 创建一个基于时间戳的数据收集器,用于输出窗口数据到下游
timestampedCollector = new TimestampedCollector<>(output);
// 获取时间服务,用于向windowAssignerContext传递当前的processing time
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
// 创建triggerContext和processContext,后面用到时候分析
triggerContext = new Context(null, null);
processContext = new WindowContext(null);
// 创建WindowAssignerContext,主要用于获取processing time
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
};
// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
// 创建windowState,用于储存窗口中的数据
if (windowStateDescriptor != null) {
windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}
// create the typed and helper states for merging windows
// 如果windowAssigner是MergingWindowAssigner子类,即使用的是SessionWindow的话,执行if内的内容
if (windowAssigner instanceof MergingWindowAssigner) {
// store a typed reference for the state of merging windows - sanity check
if (windowState instanceof InternalMergingState) {
windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
}
// TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
// TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
// TODO activate the sanity check once resolved
// else if (windowState != null) {
// throw new IllegalStateException(
// "The window uses a merging assigner, but the window state is not mergeable.");
// }
// 以下逻辑为创建储存window合并的状态变量mergingSetsState
@SuppressWarnings("unchecked")
final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;
final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
typedTuple,
new TypeSerializer[] {windowSerializer, windowSerializer});
final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
new ListStateDescriptor<>("merging-window-set", tupleSerializer);
// get the state that stores the merging sets
mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
}
}
processElement方法
有数据到达window的时候,系统会调用processElement方法。代码如下所示:
public void processElement(StreamRecord<IN> element) throws Exception {
// 获取需要分配给该元素的window。以eventtime时间窗口为例,element的eventtime位于window的起止时间之中,则该窗口需要指派给此元素
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
// 返回元素是否被处理,如果元素经过处理,返回false
boolean isSkippedElement = true;
// 此处获取到key的值,即keyBy方法字段的值
// 该KeyedStateBackend在StreamTaskStateInitializerImpl中创建
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
// 判断Window是否是MergingWindowAssigner(合并窗口)的子类。比如SessionWindow属于MergingWindowAssigner
if (windowAssigner instanceof MergingWindowAssigner) {
// MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
} else {
// 非MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
// 如果元素没有被window处理,并且元素来迟,会加入到旁路输出
// 否则此数据被丢弃,迟到被丢弃数据条数监控会增加1
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
MergingWindowAssigner部分的处理逻辑
在分析MergingWindowAssigner之前我们需要提前看下MergingWindowSet类。该类负责处理整个合并窗口的过程,以及存放合并窗口的结果。
MergingWindowSet中用一个重要的变量为mapping,定义为:
private final Map<W, W> mapping;
该类负责存放一个window到window的映射。具体这个映射有什么作用呢?
这里先提前介绍下windowState,windowState是一个HeapListState。它的内部具有三个值:key,namespace和value。对于给定的key和namespace,可以获取到属于他们的一组值。在windowOperator中,key为keyBy函数对应的元素字段的值,namspace为当前的window。
可以认为HeapListState是一个Map<K, Map<NS, V>>
的数据结构,对于给定的key,每个window和它里面的数据以Map<NS, V>
形式储存。Window对象本身作为Namespace,相当于Map<NS, V>
这个map中的key,维护了window和数据之间的关系。
但是问题来了,对于MergingWindowAssigner(SessionWindow),窗口是一直被合并的,合并之后的窗口和原先作为key保存数据对应关系的窗口肯定是不同的。该窗口对应的数据无法被合并之后的窗口获取到。
为了解决这个问题,有如下两种方案:
- 每次window合并之后,将原先window下的数据转移到合并后window下。这样涉及到很频繁的HashMap操作,性能开销较大。
- 选择一个固定的window(这里叫做状态window),来代表以后每次合并后的window,作为key来维护合并后window内的数据。同时再建立一个映射,维护状态window和合并后窗口之间的关系。Flink采用了这种做法。上述的
mapping
变量正是用于维护这个对应关系的。mapping
的key为合并后的窗口,value为状态窗口。
Flink使用一组被合并窗口中的第一个窗口,作为他们合并之后窗口的状态窗口。每次合并窗口操作发生之后,将mapping
中被合并的窗口的映射关系移除,同时将合并后窗口和状态窗口的对应关系写入mapping
。
还有一个变量是initialMapping
,它和mapping
相同,只不过是在创建MergingWindowSet的时候被赋值,和mapping
的值对比可用于判断mapping
是否发生了变更,是否需要持久化。
接下来分析下它的addWindow
方法,如下所示:
public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
List<W> windows = new ArrayList<>();
// mapping 在创建MergingWindowSet对象的时候会读取ListState。该State保存了已合并的window
windows.addAll(this.mapping.keySet());
// 将新window增加进来
windows.add(newWindow);
// 此变量保存窗口合并的结果
final Map<W, Collection<W>> mergeResults = new HashMap<>();
// 调用windowAssigner的mergeWindows方法,合并后的结果被放入了mergeResults中
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
// 合并之后的窗口
W mergeResult = c.getKey();
// 被合并的窗口
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
// 如果被合并的window中包含有当前需要加入set的window(newWindow),那么窗口合并的结果就是mergeResult,将它赋给resultWindow
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
// 获取第一个被合并的窗口作为合并后窗口的状态窗口
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
// 逐个寻找mapping的key中是否有本次操作已合并的window。
// 这里将他们转移到mergedStateWindows变量中
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
// 在mapping中设置合并后的窗口和它的状态窗口的对应关系
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
// mergedStateWindows去掉状态窗口
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
// 如果被合并的窗口不包含已合并窗口,或者被合并窗口列表大小不为1的时候
// 此处条件一定会满足,因为TimeWindow的mergeWindows方法保证了mergedWindows的size大于1的时候才会调用MergeCallback回调函数
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
// 如果一个window合并过后还是他自己(第一次调用addWindow,或者是开始了一个新的session),会进入这个if分支
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
// 返回合并后的window
return resultWindow;
}
让我们回到processElement
方法处理MergingWindowAssigner的分支部分。代码如下:
// 获取MergingWindowSet对象,负责存放合并的窗口
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
// 调用addWindow方法,返回被合并的window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
// 检测两种异常情况,在新元素加入window的时候,合并后窗口的最大时间戳不可能比当前waterwark或者处理时间还小的
// 如果是event time类型,并且合并后的窗口的maxTimestamp+allowedLateness小于等于当前的watermark会报错
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime()) {
long currentProcessingTime = internalTimerService.currentProcessingTime();
// 如果是基于processing time的窗口,合并后窗口的maxTimestamp小于等于当前处理时间的话,程序会报错
if (mergeResult.maxTimestamp() <= currentProcessingTime) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + currentProcessingTime +
" window: " + mergeResult);
}
}
// 设置triggerContext的key和window
triggerContext.key = key;
triggerContext.window = mergeResult;
// 调用具体trigger实现类的注册定时器方法
// 注册的timer触发事件为mergeResult窗口的maxTimestamp
triggerContext.onMerge(mergedWindows);
// 移除所有触发时间为被合并window的maxTimestamp的定时器
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
// 将所有被合并的状态window对应的数据写入到合并后的window对应的状态window中
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
// 如果window迟到,则清除迟到的window。isWindowLate方法后面会分析
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
// 接下来将处理此元素,需要设置为false
isSkippedElement = false;
// 获取状态window,状态window作为key,标记存储合并窗口内的数据
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
// 储存元素的值
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
// 调用trigger的onElement
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 如果触发计算,则在此提取出合并后窗口的内容,传入userFunction进行计算
emitWindowContents(actualWindow, contents);
}
// 如果触发了purge操作,则需要清空window状态
if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册window的cleanup timer 稍后分析
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
// 将已合并的window持久化到ListState中
mergingWindows.persist();
非MergingWindowAssigner部分的处理逻辑
for (W window: elementWindows) {
// drop if the window is already late
// 判断window是否迟到,后面分析isWindowLate
if (isWindowLate(window)) {
continue;
}
// 标记该元素得到了处理
isSkippedElement = false;
// windowState为HeapListState
// HeapListState为内存中存储的分区化的链表状态(ListState),使用namespace区分不同窗口的数据。可以理解为一个Map,key为window对象,value为元素的值
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
// 设置trigger上下文对象的key和window
triggerContext.key = key;
triggerContext.window = window;
// 调用trigger的onElement方法,询问trigger新element到来的时候需要作出什么动作。
TriggerResult triggerResult = triggerContext.onElement(element);
// isFire表示需要触发计算
if (triggerResult.isFire()) {
// 取出windowState当前namespace下所有的元素。即当前window下所有的元素。
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 使用用户传入的处理函数来计算window内数据,稍后分析
emitWindowContents(window, contents);
}
// 如果触发器返回需要清空数据,则删除window中所有的数据
if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册timer,当前时间已经过了window的cleanup时间(后面有cleanup time的含义),会根据窗口的类型调用对应的onProcessingTime方法或者是onEventTime方法
registerCleanupTimer(window);
}
isWindowLate函数,负责判断window是否迟到。迟到的window会被忽略。
protected boolean isWindowLate(W window) {
// 如果window类型为eventtime并且window的cleanup时间比当前watermark早,说明window已经迟到
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
cleanupTime方法,用来获取window的cleanup time
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
// cleanup time为window的end时间 + 允许迟到的时间 - 1
long cleanupTime = window.maxTimestamp() + allowedLateness;
// 如果cleanupTime溢出(返回负值),则使用Long.MAX_VALUE
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
registerCleanupTimer方法,清除window在cleanup time时刻注册的定时器
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
// 增加对应onProcessingTime或onEventTime定时器
if (windowAssigner.isEventTime()) {
// 当前的watermark如果大于cleanupTime之时,会调用trigger的onElementTime方法
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
// 当前的系统时间大于cleanupTime之时,会调用trigger的onProcessingTime方法
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
emitWindowContents,收集window中的元素,调用用户编写的处理函数。代码如下:
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
// 此处调用用户编写的处理函数
// 此处用户的函数,比如ProcessWindowFunction被InternalIterableProcessWindowFunction包装了
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
onEventTime
下面是onEventTime方法的代码。当event time触发计算的时候会调用该方法。
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
// 获取key和window
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
// 如果是MergingWindowAssigner(Session Window)
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
// 获取状态window
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
// 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
windowState.setCurrentNamespace(stateWindow);
}
} else {
// 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
// 调用trigger的onEventTime方法
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
// 此处为触发计算的逻辑
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}
// 如果触发了purge操作,则清空window中的内容
if (triggerResult.isPurge()) {
windowState.clear();
}
// 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
// 持久化window合并状态
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
onProcessingTime
onProcessingTime
方法和onEventTime
方法相比,除了调用trigger
的onProcessingTime
方法这一处不同外,其他的的逻辑基本类似,不再赘述。