Flink 双流Join原理

  • Window Join
    Tumbling Window Join
    Sliding Window Join
    Session Window Join
  • Interval Join
  • CoGroup

Window Join and CoGroup

  • Window Join 是基于时间窗口对两个流进行关联操作。
  • 相比于 Join 操作, CoGroup 提供了一个更为通用的方式来处理两个流在相同的窗口内匹配的元素。 Join 复用了 CoGroup 的实现逻辑。它们的使用方式如下:
//join
stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
//coGroup
stream.coGroup(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<CoGroupFunction>)

从 JoinFunction 和 CogroupFunction 接口的定义中可以大致看出它们的区别:

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
    OUT join(IN1 first, IN2 second) throws Exception;
}

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

可以看出来,JoinFunction 主要关注的是两个流中按照 key 匹配的每一对元素,而 CoGroupFunction 的参数则是两个中 key 相同的所有元素。JoinFunction 的逻辑更类似于 INNER JOIN,而 CoGroupFunction 除了可以实现 INNER JOIN,也可以实现 OUTER JOIN
Window Join分为三种, Tumbing Window joinSliding Window joinSession Window Join
Window 类型的join的实现机制,通过将数据缓存在Window State中,当窗口触发计算是,执行join操作

public class JoinedStreams<T1, T2> {
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            //Join 操作被转换为 CoGroup
            coGroupedWindowedStream = input1.coGroup(input2)
                .where(keySelector1)
                .equalTo(keySelector2)
                .window(windowAssigner)
                .trigger(trigger)
                .evictor(evictor)
                .allowedLateness(allowedLateness);
            //JoinFunction 被包装为 CoGroupFunction
            return coGroupedWindowedStream
                    .apply(new JoinCoGroupFunction<>(function), resultType);
        }
    }

    /**
     * CoGroup function that does a nested-loop join to get the join result.
     */
    private static class JoinCoGroupFunction<T1, T2, T>
            extends WrappingFunction<JoinFunction<T1, T2, T>>
            implements CoGroupFunction<T1, T2, T> {
        private static final long serialVersionUID = 1L;

        public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
            super(wrappedFunction);
        }

        @Override
        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
            for (T1 val1: first) {
                for (T2 val2: second) {
                    //每一个匹配的元素对
                    out.collect(wrappedFunction.join(val1, val2));
                }
            }
        }
    }
}

那么 CoGroup 又是怎么实现两个流的操作的呢?Flink 其实是通过一个变换,将两个流转换成一个流进行处理,转换之后数据流中的每一条消息都有一个标记来记录这个消息是属于左边的流还是右边的流,这样窗口的操作就和单个流的实现一样了。等到窗口被触发的时候,再按照标记将窗口内的元素分为左边的一组右边的一组,然后交给 CoGroupFunction 进行处理

public class CoGroupedStreams<T1, T2> {
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);
            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                    .map(new Input1Tagger<T1, T2>())
                    .setParallelism(input1.getParallelism())
                    .returns(unionType); //左边流
            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                    .map(new Input2Tagger<T1, T2>())
                    .setParallelism(input2.getParallelism())
                    .returns(unionType); //右边流
            
            //合并成一个数据流
            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
            return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }
    }

    //将 CoGroupFunction 封装为 WindowFunction
    private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key,
                W window,
                Iterable<TaggedUnion<T1, T2>> values,
                Collector<T> out) throws Exception {
            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();
            //窗口内的所有元素按标记重新分为左边的一组和右边的一组
            for (TaggedUnion<T1, T2> val: values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            //调用 CoGroupFunction
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }
}

Connected Streams

Window Join 可以方便地对两个数据流进行关联操作。但有些使用场景中,我们需要的并非关联操作,ConnectedStreams 提供了更为通用的双流操作

ConnectedStreams 配合 CoProcessFunctionKeyedCoProcessFunction 使用,KeyedCoProcessFunction 要求连接的两个 stream 都是 KeyedStream,并且 key 的类型一致。

ConnectedStreams 配合 CoProcessFunction 生成 CoProcessOperator,在运行时被调度为 TwoInputStreamTask,从名字也可以看书来,这个 Task 处理的是两个输入。我们简单看一下 CoProcessOperator 的实现

public class CoProcessOperator<IN1, IN2, OUT>
        extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
        implements TwoInputStreamOperator<IN1, IN2, OUT> {
    @Override
    public void processElement1(StreamRecord<IN1> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement1(element.getValue(), context, collector);
        context.element = null;
    }

    @Override
    public void processElement2(StreamRecord<IN2> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement2(element.getValue(), context, collector);
        context.element = null;
    }
}

CoProcessOperator 内部区分了两个流的处理,分别调用 CoProcessFunction.processElement1() 和 userFunction.processElement2() 进行处理。对于 KeyedCoProcessOperator 也是类似的机制。

通过内部的共享状态,可以在双流上实现很多复杂的操作。接下来我们就介绍 Flink 基于 Connected Streams 实现的另一种双流关联操作 - Interval Join。

Interval Join

image.png

默认情况下,这些是包含边界的,但是可以通过.lowerboundexclusive()和. upperboundexclusive()进行设置,如果设置了,则不包含边界

stream
    .keyBy(<KeySelector>)
    .intervalJoin(otherStream.keyBy(<KeySelector>))
    .between(<Time>,<Time>)
    .process(<ProcessJoinFunction>)

Interval Join 是基于 ConnectedStreams 实现的:

public class KeyedStream<T, KEY> extends DataStream<T> {
    public static class IntervalJoined<IN1, IN2, KEY> {
        public <OUT> SingleOutputStreamOperator<OUT> process(
                ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
                TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);

            final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

            final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                new IntervalJoinOperator<>(
                    lowerBound,
                    upperBound,
                    lowerBoundInclusive,
                    upperBoundInclusive,
                    left.getType().createSerializer(left.getExecutionConfig()),
                    right.getType().createSerializer(right.getExecutionConfig()),
                    cleanedUdf
                );

            return left
                .connect(right)
                .keyBy(keySelector1, keySelector2)
                .transform("Interval Join", outputType, operator);
        }
    }
}

在 IntervalJoinOperator 中,使用两个 MapState 分别保存两个数据流到达的消息,MapState 的 key 是消息的时间。当一个数据流有新消息到达时,就会去另一个数据流的状态中查找时间落在匹配范围内的消息,然后进行关联处理。每一条消息会注册一个定时器,在时间越过该消息的有效范围后从状态中清除该消息。


public class IntervalJoinOperator<K, T1, T2, OUT>
        extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
        implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
    //左流的状态buffer
    private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
        //右流的状态buffer
    private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
            
    @Override
    public void processElement1(StreamRecord<T1> record) throws Exception {
             //处理左流元素,processElement参数列表最后一位代表是否是左流元素,用于区分
        processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
    }

    @Override
    public void processElement2(StreamRecord<T2> record) throws Exception {
              //处理左流元素
        processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
    }

    private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {

        final THIS ourValue = record.getValue();
                 //获取数据的eventtime时间
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }
                   // 判断数据的event time是否小于水印,小于丢弃
        if (isLate(ourTimestamp)) {
            return;
        }

        //将消息加入状态中,MapState的key为当前消息的时间戳
        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        //从另一个数据流的状态中查找匹配的记录,遍历mapstate的数据
        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();
                            //判断bucket的时间是否在
                        消息时间+LowerBound < key<消息时间+UpperBound
            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }
                         //将bucket中的数据取出,传递到下游
            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        //注册清理状态的timer,水印超过cleanupTime 触发
        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }
}

  //定时器触发的回调函数
@Override
    public void onEventTime(InternalTimer<K, String> timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);
              // 通过namespace判断是左流的状态还是右流的状态
             //  注意区分左右的清除逻辑,因为左右流的到来是有先后顺序的
        switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                               //左流先到,定时upperBound时间后清理
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                               //右流是晚来的数据不需要等待,当watermark大于数据时间就可以清理掉
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

参考
https://blog.csdn.net/u013516966/article/details/102952239
https://blog.jrwang.me/2019/flink-source-code-two-stream-join/
https://mp.weixin.qq.com/s/MoIS0qQlvk6N_hnQU6r2SA

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353