聊聊flink的Triggers

本文主要研究一下flink的Triggers

Trigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

    public boolean canMerge() {
        return false;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    public interface TriggerContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();

        void registerProcessingTimeTimer(long time);

        void registerEventTimeTimer(long time);

        void deleteProcessingTimeTimer(long time);

        void deleteEventTimeTimer(long time);

        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }

    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}
  • Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult
  • onElement在每个element添加到window的时候会被回调;onProcessingTime在注册的event-time timer触发时会被回调;onEventTime在注册的processing-time timer触发时会被回调
  • canMerge用于标识是否支持trigger state的合并,默认返回false;onMerge在多个window合并的时候会被触发;clear用于清除TriggerContext中存储的相关state
  • Trigger还定义了TriggerContext及OnMergeContext;TriggerContext定义了注册及删除EventTimeTimer、ProcessingTimeTimer方法,同时还定义了getCurrentProcessingTime、getMetricGroup、getCurrentWatermark、getPartitionedState、getKeyValueState、getKeyValueState方法
  • OnMergeContext继承了TriggerContext,它多定义了mergePartitionedState方法

TriggerResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java

public enum TriggerResult {

    CONTINUE(false, false),

    FIRE_AND_PURGE(true, true),

    FIRE(true, false),

    PURGE(false, true);

    // ------------------------------------------------------------------------

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}
  • TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性,CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
  • fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据
  • CONTINUE表示不对window做任何操作;FIRE_AND_PURGE表示要触发window的computation操作然后清理window的窗口数据;FIRE表示仅仅触发window的computation操作但不清理window的窗口数据;PURGE表示不触发window的computation操作但是要清理window的窗口数据

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}
  • EventTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger
  • onElement在window.maxTimestamp()小于等于ctx.getCurrentWatermark()的时候,返回TriggerResult.FIRE,否则执行ctx.registerEventTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime在time等于window.maxTimestamp()的时候返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.CONTINUE
  • canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerEventTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteEventTimeTimer(window.maxTimestamp())

ProcessingTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        // only register a timer if the time is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the time is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }

}
  • ProcessingTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger
  • onElement执行ctx.registerProcessingTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.FIRE
  • canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerProcessingTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteProcessingTimeTimer(window.maxTimestamp())

NeverTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

    @Internal
    public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
        public void onMerge(GlobalWindow window, OnMergeContext ctx) {
        }
    }
  • NeverTrigger的onElement、onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;GlobalWindows默认使用的是NeverTrigger

CountTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private CountTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
    }

    @Override
    public String toString() {
        return "CountTrigger(" +  maxCount + ")";
    }

    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}
  • CountTrigger继承了Trigger,指定了element类型为Object类型;它定义了maxCount及ReducingStateDescriptor;其中ReducingStateDescriptor用于窗口计数(它使用的是自己定义的Sum函数),在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge执行的是ctx.mergePartitionedState(stateDesc);clear执行的是ctx.getPartitionedState(stateDesc).clear()

PurgingTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;

    private Trigger<T, W> nestedTrigger;

    private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
        this.nestedTrigger = nestedTrigger;
    }

    @Override
    public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        nestedTrigger.clear(window, ctx);
    }

    @Override
    public boolean canMerge() {
        return nestedTrigger.canMerge();
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        nestedTrigger.onMerge(window, ctx);
    }

    @Override
    public String toString() {
        return "PurgingTrigger(" + nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
        return new PurgingTrigger<>(nestedTrigger);
    }

    @VisibleForTesting
    public Trigger<T, W> getNestedTrigger() {
        return nestedTrigger;
    }
}
  • PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理

小结

  • Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult;TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性(fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
  • SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger;GlobalWindows默认使用的是NeverTrigger
  • CountTrigger主要用于计数的窗口类型,它使用ReducingStateDescriptor来进行窗口计数,在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容