Flink中数量窗口增加等待时间

Flink窗口可以分为时间维度窗口和数据量窗口。其中时间维度窗口又可分为ProcessingTime和EventTime。一般情况下以时间作为窗口的应用场景更多一些,比如:统计每分钟的数据量、处理数据乱序等。

最近有这么一个场景。采集kafka中数据并组装成SQL批量写入MySQL。由于MySQL对单个请求包(max_allowed_packet)有限制,所以考虑通过数量窗口来控制单个insert 语句的长度。实践过程中发现另外一个问题:如果窗口数量达不到,则一直不会触发,将导致数据入库延迟

下面是实现数量窗口的代码,类名称:org.apache.flink.streaming.api.datastream.KeyedStream:

    /**
     * Windows this {@code KeyedStream} into tumbling count windows.
     *
     * @param size The size of the windows in number of elements.
     */
    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

创建一个全局window,缓存数据。后面通过触发器触发事件,后续由窗口处理函数对此窗口中的数据进行处理。那么CountTrigger中又是如何实现最大值呢?下面是CountTrigger的代码:

/**
 * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
 *
 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 */
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private CountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" +  maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}

onElement方法,每次一个数据过来都会调用这个方法。此类中实现了一个ReducingState,用来累加数据流入的数量,当累加达到最大数量时,会清除累加值,并触发窗口函数进行处理。TriggerResult.FIRE即是触发执行。下面贴上TriggerResult的定义部分:

/**
     * No action is taken on the window.
     */
    CONTINUE,

    /**
     * {@code FIRE_AND_PURGE} evaluates the window function and emits the window
     * result.
     */
    FIRE_AND_PURGE,

    /**
     * On {@code FIRE}, the window is evaluated and results are emitted.
     * The window is not purged, though, all elements are retained.
     */
    FIRE,

    /**
     * All elements in the window are cleared and the window is discarded,
     * without evaluating the window function or emitting any elements.
     */
    PURGE;

CONTINUE:不做任何操作;
FIRE_AND_PURGE:触发窗口事件并且清除窗口中积累的数据;
FIRE:触发窗口事件不清除窗口中的累积的数据;
PURGE:只清除窗口中的数据,且抛弃窗口;


如果需要CountTrigger响应最大等待时间,需要更改现有的onProcessingTime方法实现,改成FIRE。为什么不直接改成FIRE_AND_PURGE呢,参考源代码风格,如果需要FIRE_AND_PURGE则外层使用PurgingTrigger.of进行包装,这样比较灵活。
新建Trigger类,的代码如下:

/**
 * 可以指定最大窗口数量和等待的时间
 *
 * @param <W>
 * @author mingbozhang
 */
public class CountWithMaxTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final long maxWaitingTime;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new CountWithMaxTimeTrigger.Sum(), LongSerializer.INSTANCE);
    /**
     * When merging we take the lowest of all fire timestamps as the new fire timestamp.
     */
    private final ReducingStateDescriptor<Long> fireTimeStateDesc =
            new ReducingStateDescriptor<>("fire-time", new CountWithMaxTimeTrigger.Min(), LongSerializer.INSTANCE);

    private CountWithMaxTimeTrigger(long maxCount, long maxWaitingTime) {
        this.maxCount = maxCount;
        this.maxWaitingTime = maxWaitingTime;
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount       The count of elements at which to fire.
     * @param maxWaitingTime max waiting ms
     * @param <W>            The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> CountWithMaxTimeTrigger<W> of(long maxCount, long maxWaitingTime) {
        return new CountWithMaxTimeTrigger<>(maxCount, maxWaitingTime);
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);

        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }

        if (fireTimestamp.get() == null) {
            long triggerTime = System.currentTimeMillis() + maxWaitingTime;
            ctx.registerProcessingTimeTimer(triggerTime);
            fireTimestamp.add(triggerTime);
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);

        if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            ctx.getPartitionedState(stateDesc).clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" + maxCount + ")";
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

在onElement中注册Timer:

 if (fireTimestamp.get() == null) {
            long triggerTime = System.currentTimeMillis() + maxWaitingTime;
            ctx.registerProcessingTimeTimer(triggerTime);
            fireTimestamp.add(triggerTime);
  }

在onProcessingTime中触发窗口事件,并且清除最大数据量计数器:

@Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);

        if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            ctx.getPartitionedState(stateDesc).clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容