序
本文主要研究一下flink KeyedStream的intervalJoin操作
实例
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
KeyedStream.intervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
//......
@PublicEvolving
public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
return new IntervalJoin<>(this, otherStream);
}
//......
}
- KeyedStream的intervalJoin创建并返回IntervalJoin
IntervalJoin
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving
public static class IntervalJoin<T1, T2, KEY> {
private final KeyedStream<T1, KEY> streamOne;
private final KeyedStream<T2, KEY> streamTwo;
IntervalJoin(
KeyedStream<T1, KEY> streamOne,
KeyedStream<T2, KEY> streamTwo
) {
this.streamOne = checkNotNull(streamOne);
this.streamTwo = checkNotNull(streamTwo);
}
@PublicEvolving
public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
TimeCharacteristic timeCharacteristic =
streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
if (timeCharacteristic != TimeCharacteristic.EventTime) {
throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
}
checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
return new IntervalJoined<>(
streamOne,
streamTwo,
lowerBound.toMilliseconds(),
upperBound.toMilliseconds(),
true,
true
);
}
}
- IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,这里可以看到between方法里头对非TimeCharacteristic.EventTime的直接抛出UnsupportedTimeCharacteristicException;between操作创建并返回IntervalJoined
IntervalJoined
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.java
@PublicEvolving
public static class IntervalJoined<IN1, IN2, KEY> {
private final KeyedStream<IN1, KEY> left;
private final KeyedStream<IN2, KEY> right;
private final long lowerBound;
private final long upperBound;
private final KeySelector<IN1, KEY> keySelector1;
private final KeySelector<IN2, KEY> keySelector2;
private boolean lowerBoundInclusive;
private boolean upperBoundInclusive;
public IntervalJoined(
KeyedStream<IN1, KEY> left,
KeyedStream<IN2, KEY> right,
long lowerBound,
long upperBound,
boolean lowerBoundInclusive,
boolean upperBoundInclusive) {
this.left = checkNotNull(left);
this.right = checkNotNull(right);
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.lowerBoundInclusive = lowerBoundInclusive;
this.upperBoundInclusive = upperBoundInclusive;
this.keySelector1 = left.getKeySelector();
this.keySelector2 = right.getKeySelector();
}
@PublicEvolving
public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
this.upperBoundInclusive = false;
return this;
}
@PublicEvolving
public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
this.lowerBoundInclusive = false;
return this;
}
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
Preconditions.checkNotNull(processJoinFunction);
final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
processJoinFunction,
ProcessJoinFunction.class,
0,
1,
2,
TypeExtractor.NO_INDEX,
left.getType(),
right.getType(),
Utils.getCallLocationName(),
true
);
return process(processJoinFunction, outputType);
}
@PublicEvolving
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}
}
- IntervalJoined默认对lowerBound及upperBound是inclusive的,它也提供了lowerBoundExclusive、upperBoundExclusive来单独设置为exclusive;IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator(
本实例left为orangeStream,right为greenStream
)
ProcessJoinFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@PublicEvolving
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {
private static final long serialVersionUID = -2444626938039012398L;
public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;
public abstract class Context {
public abstract long getLeftTimestamp();
public abstract long getRightTimestamp();
public abstract long getTimestamp();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
- ProcessJoinFunction继承了AbstractRichFunction,它定义了processElement抽象方法,同时也定义了自身的Context对象,该对象定义了getLeftTimestamp、getRightTimestamp、getTimestamp、output四个抽象方法
IntervalJoinOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
private static final long serialVersionUID = -5380774605111543454L;
private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);
private static final String LEFT_BUFFER = "LEFT_BUFFER";
private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
private final long lowerBound;
private final long upperBound;
private final TypeSerializer<T1> leftTypeSerializer;
private final TypeSerializer<T2> rightTypeSerializer;
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;
private transient InternalTimerService<String> internalTimerService;
public IntervalJoinOperator(
long lowerBound,
long upperBound,
boolean lowerBoundInclusive,
boolean upperBoundInclusive,
TypeSerializer<T1> leftTypeSerializer,
TypeSerializer<T2> rightTypeSerializer,
ProcessJoinFunction<T1, T2, OUT> udf) {
super(Preconditions.checkNotNull(udf));
Preconditions.checkArgument(lowerBound <= upperBound,
"lowerBound <= upperBound must be fulfilled");
// Move buffer by +1 / -1 depending on inclusiveness in order not needing
// to check for inclusiveness later on
this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;
this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
context = new ContextImpl(userFunction);
internalTimerService =
getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
if (isLate(ourTimestamp)) {
return;
}
addToBuffer(ourBuffer, ourValue, ourTimestamp);
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
private boolean isLate(long timestamp) {
long currentWatermark = internalTimerService.currentWatermark();
return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
}
private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
userFunction.processElement(left, right, context, collector);
}
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
@Override
public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
// do nothing.
}
//......
}
- IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口
- IntervalJoinOperator覆盖了AbstractUdfStreamOperator(
StreamOperator定义
)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState - IntervalJoinOperator实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法(
TwoInputStreamOperator接口定义的其他一些方法在AbstractUdfStreamOperator的父类AbstractStreamOperator中有实现
);processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同 - processElement方法里头实现了intervalJoin的时间匹配逻辑,它会从internalTimerService获取currentWatermark,然后判断element是否late,如果late直接返回,否则继续往下执行;之后就是把element的value添加到ourBuffer中(
对于processElement1来说ourBuffer为leftBuffer,对于processElement2来说ourBuffer为rightBuffer
);之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法
);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer - IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element
小结
- flink的intervalJoin操作要求是KeyedStream,而且必须是TimeCharacteristic.EventTime;KeyedStream的intervalJoin创建并返回IntervalJoin;IntervalJoin提供了between操作,用于设置interval的lowerBound及upperBound,该操作创建并返回IntervalJoined
- IntervalJoined提供了process操作,接收的是ProcessJoinFunction;process操作里头创建了IntervalJoinOperator,然后执行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator
- IntervalJoinOperator继承了AbstractUdfStreamOperator抽象类,实现了TwoInputStreamOperator及Triggerable接口;它覆盖了AbstractUdfStreamOperator(
StreamOperator定义
)的open、initializeState方法,它在open方法里头创建了InternalTimerService,传递的Triggerable参数为this,即自身实现的Triggerable接口;在initializeState方法里头创建了leftBuffer和rightBuffer两个MapState;它实现了TwoInputStreamOperator接口定义的processElement1、processElement2方法,processElement1、processElement2方法内部都调用了processElement方法,只是传递的relativeLowerBound、relativeUpperBound、isLeft参数不同以及leftBuffer和rightBuffer的传参顺序不同 - IntervalJoinOperator的processElement方法里头实现了intervalJoin的时间匹配逻辑,它首先判断element是否late,如果late直接返回,之后将element添加到buffer中,然后对之后就是遍历otherBuffer中的每个元素,挨个判断时间是否满足要求(
即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不满足要求的直接跳过,满足要求的就调用collect方法(collect方法里头执行的是userFunction.processElement,即调用用户定义的ProcessJoinFunction的processElement方法
);之后就是计算cleanupTime,调用internalTimerService.registerEventTimeTimer注册清理该element的timer - IntervalJoinOperator实现了Triggerable接口定义的onEventTime及onProcessingTime方法,其中onProcessingTime不做任何操作,而onEventTime则会根据timestamp清理leftBuffer或者rightBuffer中的element