Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
Evictor
为何使用Evictor
Evictor的作用是在Flink进行计算之前移除元素。
以如下使用场景为例:stream每次进入一个元素(CountTrigger, maxCount设置为1)的时候获取最近2小时内的数据。这种情况下可以使用Flink提供的EventTimeSessionWindows
。EventTimeSessionWindows
使用EventTimeTrigger(当前时间为window的maxTimestamp的时候该trigger会被触发)。该window初始化时接受一个gap参数。多个元素依次到来,如果这些元素之间的时间间隔均不大于gap,他们会被合并至同一个window中。如果两个元素的时间间隔大于gap,则之前的window会被截断,后面的元素会进入一个新的window中。
val stream = ...
stream.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.hours(2)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.hours(2)))
Flink中每个window中的数据可以简单理解为以key value的形式在HeapListState中的CopyOnWriteStateTable中存储。key为window对象本身,value为该window中的数据。window的合并运算是将window进行并集运算,同时合并value集合的内容。这样一来合并之后的window所含数据很可能存在2小时之前的数据。在计算前排除他们我们需要evictor来帮忙。在这个例子中,我们用TimeEvictor将2小时之前的老数据清理出去。
public interface Evictor<T, W extends Window> extends Serializable {
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
// 在计算操作执行前执行evict操作
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
// 在计算操作执行完后执行evict操作
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* A context object that is given to {@link Evictor} methods.
*/
interface EvictorContext {
/**
* Returns the current processing time.
*/
long getCurrentProcessingTime();
/**
* Returns the metric group for this {@link Evictor}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
* function.
*
* <p>You must not call methods that create metric objects
* (such as {@link MetricGroup#counter(int)} multiple times but instead call once
* and store the metric object in a field.
*/
MetricGroup getMetricGroup();
/**
* Returns the current watermark time.
*/
long getCurrentWatermark();
}
}
通常来说操作window的类为WindowOperator,它并没有使用evictor。一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。
evictBefore和eviceAfter的调用时间点如下:
// EvictWindowOperator.java
private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
@Override
public TimestampedValue<IN> apply(StreamRecord<IN> input) {
return TimestampedValue.from(input);
}
});
// 后续处理逻辑之前调用evictorBefore
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
FluentIterable<IN> projectedContents = recordsWithTimestamp
.transform(new Function<TimestampedValue<IN>, IN>() {
@Override
public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
processContext.window = triggerContext.window;
// 后续处理逻辑
userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
// 后续处理逻辑之后调用evictorAfter
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
//work around to fix FLINK-4369, remove the evicted elements from the windowState.
//this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
windowState.clear();
for (TimestampedValue<IN> record : recordsWithTimestamp) {
windowState.add(record.getStreamRecord());
}
}
Evictor在Flink中有如下3个实现
- TimeEvictor
- CountEvictor
- DeltaEvictor
TimeEvictor
以时间为判断标准,决定元素是否会被移除。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
// 如果element没有timestamp,直接返回
if (!hasTimestamp(elements)) {
return;
}
// 获取elements中最大的时间戳(到来最晚的元素的时间)
long currentTime = getMaxTimestamp(elements);
// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 清除所有时间戳小于截止时间的元素
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}
CountEvictor
以元素计数为标准,决定元素是否会被移除。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
// 小于最大数量,不做处理
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
// 移除前size - maxCount个元素,只剩下最后maxCount个元素
iterator.remove();
}
}
}
}
DeltaEvictor
DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果函数计算结果大于等于threshold,则该元素会被移除。
evict方法如下:
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
// 获取最后一个元素
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较
// 若计算结果大于threshold值或者是相等,则该元素会被移除
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}