Flink窗口可以分为时间维度窗口和数据量窗口。其中时间维度窗口又可分为ProcessingTime和EventTime。一般情况下以时间作为窗口的应用场景更多一些,比如:统计每分钟的数据量、处理数据乱序等。
最近有这么一个场景。采集kafka中数据并组装成SQL批量写入MySQL。由于MySQL对单个请求包(max_allowed_packet)有限制,所以考虑通过数量窗口来控制单个insert 语句的长度。实践过程中发现另外一个问题:如果窗口数量达不到,则一直不会触发,将导致数据入库延迟。
下面是实现数量窗口的代码,类名称:org.apache.flink.streaming.api.datastream.KeyedStream:
/**
* Windows this {@code KeyedStream} into tumbling count windows.
*
* @param size The size of the windows in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
创建一个全局window,缓存数据。后面通过触发器触发事件,后续由窗口处理函数对此窗口中的数据进行处理。那么CountTrigger中又是如何实现最大值呢?下面是CountTrigger的代码:
/**
* A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
onElement方法,每次一个数据过来都会调用这个方法。此类中实现了一个ReducingState,用来累加数据流入的数量,当累加达到最大数量时,会清除累加值,并触发窗口函数进行处理。TriggerResult.FIRE即是触发执行。下面贴上TriggerResult的定义部分:
/**
* No action is taken on the window.
*/
CONTINUE,
/**
* {@code FIRE_AND_PURGE} evaluates the window function and emits the window
* result.
*/
FIRE_AND_PURGE,
/**
* On {@code FIRE}, the window is evaluated and results are emitted.
* The window is not purged, though, all elements are retained.
*/
FIRE,
/**
* All elements in the window are cleared and the window is discarded,
* without evaluating the window function or emitting any elements.
*/
PURGE;
CONTINUE:不做任何操作;
FIRE_AND_PURGE:触发窗口事件并且清除窗口中积累的数据;
FIRE:触发窗口事件不清除窗口中的累积的数据;
PURGE:只清除窗口中的数据,且抛弃窗口;
如果需要CountTrigger响应最大等待时间,需要更改现有的onProcessingTime方法实现,改成FIRE。为什么不直接改成FIRE_AND_PURGE呢,参考源代码风格,如果需要FIRE_AND_PURGE则外层使用PurgingTrigger.of进行包装,这样比较灵活。
新建Trigger类,的代码如下:
/**
* 可以指定最大窗口数量和等待的时间
*
* @param <W>
* @author mingbozhang
*/
public class CountWithMaxTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final long maxWaitingTime;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new CountWithMaxTimeTrigger.Sum(), LongSerializer.INSTANCE);
/**
* When merging we take the lowest of all fire timestamps as the new fire timestamp.
*/
private final ReducingStateDescriptor<Long> fireTimeStateDesc =
new ReducingStateDescriptor<>("fire-time", new CountWithMaxTimeTrigger.Min(), LongSerializer.INSTANCE);
private CountWithMaxTimeTrigger(long maxCount, long maxWaitingTime) {
this.maxCount = maxCount;
this.maxWaitingTime = maxWaitingTime;
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param maxWaitingTime max waiting ms
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountWithMaxTimeTrigger<W> of(long maxCount, long maxWaitingTime) {
return new CountWithMaxTimeTrigger<>(maxCount, maxWaitingTime);
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
if (fireTimestamp.get() == null) {
long triggerTime = System.currentTimeMillis() + maxWaitingTime;
ctx.registerProcessingTimeTimer(triggerTime);
fireTimestamp.add(triggerTime);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
ctx.getPartitionedState(stateDesc).clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
在onElement中注册Timer:
if (fireTimestamp.get() == null) {
long triggerTime = System.currentTimeMillis() + maxWaitingTime;
ctx.registerProcessingTimeTimer(triggerTime);
fireTimestamp.add(triggerTime);
}
在onProcessingTime中触发窗口事件,并且清除最大数据量计数器:
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
ctx.getPartitionedState(stateDesc).clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}