Flink DataStream 实现双流 Join 的主要方式有 WindowJoin、connect 和 IntervalJoin ,以下从源码角度介绍其使用和实现。
-
Union
1.1 使用
用户需保证左右两流数据类型相同,对两流进行合并操作。
stream
.union(otherStream)
1.2 原理
新建 UnionTransformation ,并且取左右 DataStream 的 Transformation 作为 inputs。若 DataStream 的数据类型不同,则报错。
public final DataStream<T> union(DataStream<T>... streams) {
List<Transformation<T>> unionedTransforms = new ArrayList<>();
unionedTransforms.add(this.transformation);
for (DataStream<T> newStream : streams) {
if (!getType().equals(newStream.getType())) {
throw new IllegalArgumentException(
"Cannot union streams of different types: "
+ getType()
+ " and "
+ newStream.getType());
}
unionedTransforms.add(newStream.getTransformation());
}
return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
}
-
Cogroup
用户自定义 join key 、 窗口以及 CoGroupFunction,对左右流相同 key ,相同窗口的数据进行处理。CoGroupFunction 的输入是,左流和右流在当前窗口中的数据。
1.1 使用
stream.coGroup(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<CoGroupFunction>)
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
1.2 原理
1.2.1 DataStream 对象演变
DataStream 调用 coGroup 方法生成 CoGroupedStreams。
public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
return new CoGroupedStreams<>(this, otherStream);
}
CoGroupedStreams 调用 where 方法生成 CoGroupedStreams.Where(以下简称 Where),
Where 调用 equalTo 方法生成 CoGroupedStreams.Where.EqualTo(以下简称 EqualTo),
EqualTo 调用 window 方法生成 CoGroupedStreams.WithWindow(以下简称 WithWindow) ,WithWindow 掉用 trigger 方法生成 WithWindow【可选】,WithWindow 掉用 evictor 方法生成 WithWindow【可选】,
WithWindow 掉用 allowedLateness 方法生成 WithWindow【可选】,WithWindow 掉用 apply 方法生成
DataStream。结论:cogroup 基于 union 和 window 实现,window 的实现参考 Flink 源码解读(三) Timer & WaterMark & Window。
public class CoGroupedStreams<T1, T2> {
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return where(keySelector, keyType);
}
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) {
Preconditions.checkNotNull(keySelector);
Preconditions.checkNotNull(keyType);
return new Where<>(input1.clean(keySelector), keyType);
}
public class Where<KEY> {
private final KeySelector<T1, KEY> keySelector1;
private final TypeInformation<KEY> keyType;
Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keyType = keyType;
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
final TypeInformation<KEY> otherKey =
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return equalTo(keySelector, otherKey);
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) {
return new EqualTo(input2.clean(keySelector));
}
@Public
public class EqualTo {
private final KeySelector<T2, KEY> keySelector2;
EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}
@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(
WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
assigner,
null,
null,
null);
}
}
}
}
public class CoGroupedStreams<T1, T2> {
public static class WithWindow<T1, T2, KEY, W extends Window> {
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(
Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
newTrigger,
evictor,
allowedLateness);
}
@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(
Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
newEvictor,
allowedLateness);
}
@PublicEvolving
public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
evictor,
newLateness);
}
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
TypeInformation<T> resultType =
TypeExtractor.getCoGroupReturnTypes(
function, input1.getType(), input2.getType(), "CoGroup", false);
return apply(function, resultType);
}
public <T> DataStream<T> apply(
CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType =
new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector =
new UnionKeySelector<>(keySelector1, keySelector2);
DataStream<TaggedUnion<T1, T2>> taggedInput1 =
input1.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> taggedInput2 =
input2.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType);
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
windowedStream =new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType).window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
}
public class CoGroupedStreams<T1, T2> {
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}
@Override
public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
throws Exception {
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val : values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
}
1.2.2 Transformation 对象演变
UnionTransformation -- > PartitionTransformation --> OneInputTransformation(SimpleOperatorFactory(WindowOperator(
InternalIterableWindowFunction(
WindowFunction(CoGroupFunction)))/EvictingWindowOperator))
-
Join
2.1 使用
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
2.2 原理
2.2.1 DataStream 对象演变
DataStream 调用 join 方法生成 JoinedStreams。
public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
return new JoinedStreams<>(this, otherStream);
}
JoinedStreams 调用 where 方法生成 JoinedStreams.Where(以下简称 Where),
Where 调用 equalTo 方法生成 JoinedStreams.Where.EqualTo(以下简称 EqualTo),
EqualTo 调用 window 方法生成 JoinedStreams.WithWindow(以下简称 WithWindow) ,WithWindow 掉用 trigger 方法生成 WithWindow,WithWindow 掉用 evictor 方法生成 WithWindow,
WithWindow 掉用 allowedLateness 方法生成 WithWindow,WithWindow 掉用 apply 方法生成
DataStream。结论:join 基于 cogroup 实现,cogroup 的实现参考本文章节 1.2。
public class JoinedStreams<T1, T2> {
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
return where(keySelector, keyType);
}
public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) {
requireNonNull(keySelector);
requireNonNull(keyType);
return new Where<>(input1.clean(keySelector), keyType);
}
@Public
public class Where<KEY> {
private final KeySelector<T1, KEY> keySelector1;
private final TypeInformation<KEY> keyType;
Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keyType = keyType;
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector) {
requireNonNull(keySelector);
final TypeInformation<KEY> otherKey =
TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
return equalTo(keySelector, otherKey);
}
public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) {
return new EqualTo(input2.clean(keySelector));
}
@Public
public class EqualTo {
private final KeySelector<T2, KEY> keySelector2;
EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}
public <W extends Window> WithWindow<T1, T2, KEY, W> window(
WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,assigner,null,null,null);
}
}
}
}
public class JoinedStreams<T1, T2> {
public static class WithWindow<T1, T2, KEY, W extends Window> {
public WithWindow<T1, T2, KEY, W> trigger(
Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,newTrigger,evictor,allowedLateness);
}
public WithWindow<T1, T2, KEY, W> evictor(
Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,newTrigger,evictor,allowedLateness);
}
public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
return new WithWindow<>(input1,input2,keySelector1,keySelector2,keyType,windowAssigner,newTrigger,evictor,allowedLateness);
}
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
return apply(function, resultType);
}
public <T> DataStream<T> apply(
FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
coGroupedWindowedStream =
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);
return coGroupedWindowedStream.apply(new FlatJoinCoGroupFunction<>(function), resultType);
}
}
}
2.2.2 Transformation 对象演变
UnionTransformation -- > PartitionTransformation --> OneInputTransformation(SimpleOperatorFactory(WindowOperator(
InternalIterableWindowFunction(
WindowFunction(CoGroupFunction(JoinFunction))))/EvictingWindowOperator))
-
Connect
4.1 使用
stream
.connect(otherStream)
.process(<CoProcessFunction>)
4.2 原理
4.2.1 DataStream 对象演变
DataStream 调用 connect 方法生成 ConnectedStreams。
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
return new ConnectedStreams<>(environment, this, dataStream);
}
public class ConnectedStreams<IN1, IN2> {
public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction, TypeInformation<R> outputType) {
TwoInputStreamOperator<IN1, IN2, R> operator;
if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
operator = new LegacyKeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
} else {
operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
}
return transform("Co-Process", outputType, operator);
}
}
4.2.2 Transformation 对象演变
TwoInputTransformation(SimpleOperatorFactory(CoProcessOperator(CoProcessFunction)))
4.2.3 CoProcessOperator
public class CoProcessOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement1(element.getValue(), context, collector);
context.element = null;
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement2(element.getValue(), context, collector);
context.element = null;
}
}
-
IntervalJoin
5.1 使用
stream
.keyBy(<KeySelector>)
.intervalJoin(otherStream.keyBy(<KeySelector>))
.between(<Time>,<Time>)
.process(<ProcessJoinFunction>)
5.2 原理
5.2.1 KeyedStream 对象演变
KeyedStream 调用 intervalJoin 方法生成 IntervalJoin。
public class KeyedStream<T, KEY> extends DataStream<T> {
public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
return new IntervalJoin<>(this, otherStream);
}
}
public static class IntervalJoin<T1, T2, KEY> {
public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
if (timeBehaviour != TimeBehaviour.EventTime) {
throw new UnsupportedTimeCharacteristicException(
"Time-bounded stream joins are only supported in event time");
}
checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
return new IntervalJoined<>(
streamOne,
streamTwo,
lowerBound.toMilliseconds(),
upperBound.toMilliseconds(),
true,
true);
}
public static class IntervalJoined<IN1, IN2, KEY> {
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf =
left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf);
return left.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}
}
}
结论:intervaljoin 基于 connect 实现,使用 relativeLowerBound 和 relativeUpperBound 进行过滤。
5.2.2 Transformation 对象演变
PartitionTransformation -->TwoInputTransformation(SimpleOperatorFactory(CoProcessOperator(CoProcessFunction)))
5.2.3 IntervalJoinOperator
public class IntervalJoinOperator<K, T1, T2, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
public void processElement1(StreamRecord<T1> record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
public void processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft)
throws Exception {
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound
|| timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry : bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime =
(relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
}