Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
本篇主要对Flink的window源码进行分析,包含:
- Window类
- GlobalWindow和TimeWindow
- WindowAssigner
- GlobalWindow
- TumblingEventTimeWindows
- SlidingEventTimeWindows
- EventTimeSessionWindows
- DynamicEventTimeSessionWindows
- 以上window对应的processing time版本
Window类
Window在这里可理解为元素的一种分组方式。
Window为一个抽象类,其中仅定义了一个方法maxTimestamp()
,其意义为该window时间跨度所能包含的最大时间点(用时间戳表示)。
Window类包含两个子类:GlobalWindow和TimeWindow。GlobalWindow是全局窗口,TimeWindow是具有起止时间的时间段窗口。
Window的代码如下所示:
public abstract class Window {
/**
* Gets the largest timestamp that still belongs to this window.
*
* @return The largest timestamp that still belongs to this window.
*/
public abstract long maxTimestamp();
}
GlobalWindow
第一种要介绍的window为GlobalWindow。顾名思义,GlobalWindow为一种全局的window。该window只存在一个实例(单例模式)。
GlobalWindow的部分代码如下所示:
private static final GlobalWindow INSTANCE = new GlobalWindow();
private GlobalWindow() { }
// 由此可知GlobalWindow为单例模式
public static GlobalWindow get() {
return INSTANCE;
}
// 由于GlobalWindow为单实例,同时根据GlobalWindow的定义,任何元素都属于GlobalWindow,故maxTimestamp返回Long的最大值
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
TimeWindow
和GlobalWindow不同的是,TimeWindow定义了明确的起止时间(start和end),TimeWindow是有明确时间跨度的。
TimeWindow还定义了Window的计算方法,比如判断是否有交集,求并集等。代码如下所示:
// 判断两个时间窗口是否有交集
public boolean intersects(TimeWindow other) {
return this.start <= other.end && this.end >= other.start;
}
/**
* Returns the minimal window covers both this window and the given window.
*/
// 返回两个window的并集
public TimeWindow cover(TimeWindow other) {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}
Intersect方法示例:
cover方法如图所示:
GlobalWindow和TimeWindow的对比示例
WindowAssigner
对一个流进行window操作,元素以它们的key(keyBy函数指定)和它们所属的window进行分组。位于相同key和相同窗口的一组元素称之为窗格(pane)。
在Flink中,window和window中的数据以key-value对应关系的形式存放(windowState,以HeapListState方式储存,在WindowOperator中定义,)。每次Flink接收到一个元素,会通过一定途径获取到包含该元素的window集合(assignWindows方法),将此元素加入到状态表中。
举例来说,加入当前的状态表如下:
(key为window,value为该window包含的元素)
{w1: [v1, v2]}
{w2: [v3]}
{w3: [v4]}
此时元素v5到来,包含元素v5的窗口为w1和w2,更新之后的状态表为:
{w1: [v1, v2, v5]}
{w2: [v3, v5]}
{w3: [v4]}
WindowAssigner的作用就是规定如何根据一个元素来获取包含它的窗口集合(assignWindows方法)。除此之外windowAssigner还包含窗口的触发机制(何时计算窗口内元素)和时间类型(event time或processing time)
WindowAssigner源码如下:
// T 为元素类型,W为窗口类型
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Returns a {@code Collection} of windows that should be assigned to the element.
*
* @param element The element to which windows should be assigned.
* @param timestamp The timestamp of the element.
* @param context The {@link WindowAssignerContext} in which the assigner operates.
*/
// 获取包含元素的window集合
// 参数解释element: 进入窗口的元素。timestamp:元素的时间戳即event time。context:WindowAssigner上下文对象,需要用于携带processing time。
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* Returns the default trigger associated with this {@code WindowAssigner}.
*/
// 获取默认的触发器
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* Returns a {@link TypeSerializer} for serializing windows that are assigned by
* this {@code WindowAssigner}.
*/
// 获取window的序列化器
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* Returns {@code true} if elements are assigned to windows based on event time,
* {@code false} otherwise.
*/
// 如果窗口是基于event time的,返回true。否则返回false
public abstract boolean isEventTime();
/**
* A context provided to the {@link WindowAssigner} that allows it to query the
* current processing time.
*
* <p>This is provided to the assigner by its containing
* {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
* which, in turn, gets it from the containing
* {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
*/
public abstract static class WindowAssignerContext {
/**
* Returns the current processing time.
*/
// 获取当前的processing time。在使用processing time类型的window时候会使用此方法。
public abstract long getCurrentProcessingTime();
}
}
GlobalWindows
此处的GlobalWindows实际上是一个GlobalWindow的分配器。负责为元素分配所属的GlobalWindow。
GlobalWindows的一个应用场景为分配Count Window,即每累计够n个元素会触发计算的window。源码如下所示:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
// 使用CountTrigger,每隔size个元素,触发一次计算,同时又使用PurgingTrigger,每次触发计算之后将window内容清空
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
CountTrigger和PurgingTrigger的讲解参见Flink 源码之Trigger。
GlobalWindows的源码如下:
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
private GlobalWindows() {}
@Override
public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
// 由于GlobalWindow为单例,故分配window时直接将GlobalWindow示例返回即可
return Collections.singletonList(GlobalWindow.get());
}
@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
// 默认不会触发任何计算
return new NeverTrigger();
}
@Override
public String toString() {
return "GlobalWindows()";
}
/**
* Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
* all elements to the same {@link GlobalWindow}.
*
* @return The global window policy.
*/
public static GlobalWindows create() {
return new GlobalWindows();
}
/**
* A trigger that never fires, as default Trigger for GlobalWindows.
*/
@Internal
// 此处定义了NeverTrigger,该Trigger在任何情况下都返回TriggerResult.CONTINUE,不触发任何计算
public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
}
}
@Override
public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new GlobalWindow.Serializer();
}
@Override
public boolean isEventTime() {
return false;
}
}
EventTime Windows
EventTime window是基于事件时间的window。Event time为元素生成时候的时间,通常时间信息在元素内容中携带。例如:
{
"message": "Server memory is running low",
"timestamp": 1280977330000
}
官网Event time的讲解链接:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html
TumblingEventTimeWindows
Tumbling Window即翻滚窗口,它的特性为相邻的窗口之间没有重叠。一个元素只可能属于一个窗口。
TumblingEventTimeWindows 如下所示:
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
// 窗口大小
private final long size;
// 窗口偏移
private final long offset;
protected TumblingEventTimeWindows(long size, long offset) {
// offset大小必须位于0和size之间
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
}
this.size = size;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
// 获取到的start值为刚好不大于timestamp的size的整数倍,再加上offset和windowSize取模的值
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
*
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
SlidingEventTimeWindows
Sliding Window即滑动窗口,sliding窗口具有三个属性,窗口长度(size),滑动距离(slide)和偏移量(offset)。滑动窗口是可以重叠的。这意味着一个元素可以同时属于多个窗口。如下图所示。
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
// 窗口大小(跨度)
private final long size;
// 窗口滑动距离
private final long slide;
// 偏移量
private final long offset;
protected SlidingEventTimeWindows(long size, long slide, long offset) {
// offset必须小于slide,并且size必须大于0
if (Math.abs(offset) >= slide || size <= 0) {
throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
"abs(offset) < slide and size > 0");
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// 对于SlidingWindow,窗口之间会有重叠。在某一时间点会有size/slide个窗口包含它
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
// 获取到的start值为刚好不大于timestamp的size的整数倍,再加上offset和windowSize取模的值
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
// 从startStart开始,每次减去一个slide。之间所有的窗口都会包含该元素,需要返回
// 循环的时候start必须要大于timestamp - size。如果start < timestamp - size,那么(start + size) 将会小于timestamp,即end < timestamp,窗口不可能包含这个元素
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
// 忘记设置env使用event time或者没有指定timestamp字段,会导致timestamp为LONG.MIN_VALUE
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
public long getSize() {
return size;
}
public long getSlide() {
return slide;
}
// 默认使用event time trigger
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
}
/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to sliding time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
*
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
MergingWindowAssigner
MergingWindowAssigner为WindowAssigner的拓展。和WindowAssigner不同的是,MergingWindowAssigner支持窗口的合并操作。目前Flink的Session Window实现了MergingWindowAssigner,支持窗口的合并。根据Session Window的定义,如果相邻两个元素之间的时间差不超过窗口的gap,这两个元素就属于同一个窗口,由此可见session window的长度是随时变化的,不像是上述的两种window,size是固定的值。Session window随着时间的推移需要不断的合并,才能容纳下更多的元素。合并窗口的逻辑在mergeWindows
方法中。该方法包含有一个MergeCallback
对象,用于在合并窗口的时候给出通知,执行一些额外的逻辑。
MergingWindowAssigner的源码如下:
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
private static final long serialVersionUID = 1L;
/**
* Determines which windows (if any) should be merged.
*
* @param windows The window candidates.
* @param callback A callback that can be invoked to signal which windows should be merged.
*/
// 合并窗口的逻辑
// 参数为:windows: 待合并的窗口集合,callback: 回调函数,用于通知将要被合并的window
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
/**
* Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
* windows should be merged.
*/
public interface MergeCallback<W> {
/**
* Specifies that the given windows should be merged into the result window.
*
* @param toBeMerged The list of windows that should be merged into one window.
* @param mergeResult The resulting merged window.
*/
// toBeMerged:将要被合并的window
// mergeResult:合并结果窗口
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
EventTimeSessionWindows
Session Window具有一个属性:session timeout(或gap)。
Session Window的特点为:同一个Session Window的元素,任意紧邻的两个之间的时间差不会超过window的gap值。如果新到来的元素和前一个元素的时间差小于gap,后来的元素会进入到之前元素的window中,否则,新的元素会进入到新的window。如下图所示(官网图片过小,放出官网链接如下https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html
):
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
// session超时时间,根据session window的定义,相邻的两个元素时间间隔如果超过session超时时间,后面新到来的window会进入新的窗口中
protected long sessionTimeout;
protected EventTimeSessionWindows(long sessionTimeout) {
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
}
this.sessionTimeout = sessionTimeout;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
//直接返回start为timestamp,end为timestamp + sessionTimeout的window,合并window的操作在WindowOperator中,会生成时间跨度覆盖满足session window定义元素的window。会在后续博客分析
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "EventTimeSessionWindows(" + sessionTimeout + ")";
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static EventTimeSessionWindows withGap(Time size) {
return new EventTimeSessionWindows(size.toMilliseconds());
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
* @return The policy.
*/
// 动态的EventTimeSessionWindow,gap可以随着元素到来调整的
@PublicEvolving
public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
/**
* Merge overlapping {@link TimeWindow}s.
*/
//合并重叠的window,方法稍后分析
@Override
public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
}
TimeWindow.mergeWindows方法。该方法定义了时间窗口的合并逻辑。如下所示:
public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
// sort the windows by the start time and then merge overlapping windows
List<TimeWindow> sortedWindows = new ArrayList<>(windows);
// 依照各个window的起始时间对window进行排序
Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
// 为(2)变量的集合类型,存放多组合并结果
List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
// (2)储存合并的窗口。Tuple的第一个参数含义为合并后的window,第二个参数意义为被合并的各个window
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
// 逐个合并窗口
for (TimeWindow candidate: sortedWindows) {
// 最开始合并的时候,currentMerge为null
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
// 如果目前已合并的window(currentMerge)和待合并的窗口(candidate),时间上有交集,则需要将这两个窗口时间合并(取并集),重新赋予Tuple2第一个参数,然后将被合并的窗口(candidate)加入到Tuple第二个参数的集合中
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
// 这个分支处理待合并window和当前已合并的window没有交集的情况。
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}
// 合并操作结束后将currentMerge结果加入到merged集合
if (currentMerge != null) {
merged.add(currentMerge);
}
for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
// 只有被合并的window个数多于一个的时候,才需要通知window合并
if (m.f1.size() > 1) {
// 通知各个被merge的Window
// Window合并的结果在此处输出
c.merge(m.f1, m.f0);
}
}
}
merged 变量的内容如图所示:
动态session window(Dynamic session window)
上述非Dynamic session window的session timeout是固定的,window一经创建不会再更改。dynamic session window和上述session window不同的是,它的session timeout值是动态可变的。动态session window内部维护了一个sessionWindowTimeGapExtractor
对象,该对象定义了进入window的元素和session timeout值的函数关系。源码如下所示:
public interface SessionWindowTimeGapExtractor<T> extends Serializable {
/**
* Extracts the session time gap.
* @param element The input element.
* @return The session time gap in milliseconds.
*/
// 定义了进入window的元素和session timeout值的函数关系
long extract(T element);
}
DynamicEventTimeSessionwindow的assignWindows方法如下所示。显然,每次assign window的时候都回去调用sessionWindowTimeGapExtractor.extract
方法,询问session timeout时间。
public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
// 调用extractor获取sessionTimeout值,其余的逻辑和传统session window相同
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
基于Processing Time的window
Processing Time的各类window处理方式和event time类似,只不过是assignWindows方法使用:
long currentProcessingTime = context.getCurrentProcessingTime();
方法来获取当前的processing time。除此之外其他的逻辑基本相同,不再赘述。
参考资料
https://flink.apache.org/news/2015/12/04/Introducing-windows.html