Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
Flink 批处理问题
问题描述
Flink 1.14.x版本BATCH模式执行会丢失数据。
演示程序:
//create env and tableEnv
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING)
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(3)
val tableEnv = StreamTableEnvironment.create(env)
// make data ,6 line
val resultDS2 = env.fromElements(
Row.of("Alice"),
Row.of("alice"),
Row.of("Bob"),
Row.of("lily"),
Row.of("lily"),
Row.of("lily")
)(Types.ROW(Types.STRING))
// dataStream[Row] --> Table --> sql to upper transform table
val table = tableEnv.fromDataStream(resultDS2).as("word")
tableEnv.createTemporaryView(s"tmp_table",table)
val resultTable = tableEnv.sqlQuery(s" select UPPER(word) as word from tmp_table ")
// sql transformed table --> DataStream[String]
val resultDs = tableEnv.toDataStream(resultTable).map(row => {
row.getField("word").asInstanceOf[String]
})
// keyby reduce
val counts: DataStream[(String, Int)] = resultDs
.map((_, 1))
.keyBy(_._1)
.sum(1)
// print result
counts.print()
env.execute("WordCount")
期待的输出为:
(BOB,1)
(ALICE,2)
(LILY,3)
输出只有
(BOB,1)
Alice和Lily不会被输出。
如果修改并行度为1,那么输出会漏掉Lily。也就是说丢失的数据会随着并行度的不同而变化。
问题调查
将算子链一步步断开debug,发现直到keyBy
算子的输出都正常,到了sum
算子之后才出现数据丢失问题。接下来重点对sum
算子背后逻辑进行调查。
Flink 1.13.3 中调试
Flink 1.13.3 中即便配置成env.setRuntimeMode(RuntimeExecutionMode.BATCH)
,实际执行sum
算子的仍然是
StreamGroupedReduceOperator
。可见batch模式并没有启用,这是一个明显的bug。我们顺便看一下它的processElement
方法代码:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
if (currentValue != null) {
// 如果当前元素不为null,说明这是reduce执行过程中
// 执行reduce方法,上面的sum算子生成的reduce函数逻辑为求两个字段值之和
IN reduced = userFunction.reduce(currentValue, value);
// 更新保存状态
values.update(reduced);
// 将数据发往下游
output.collect(element.replace(reduced));
} else {
// 这个分支是sum第一个元素的时候执行,第一个元素到来之前的currentValue为null
// 直接将第一个元素的值作为初始值,更新到状态中
values.update(value);
// 将数据发往下游
output.collect(element.replace(value));
}
}
这种流式计算operator的特点是会输出中间结果,数据到来一次计算一次。如果将问题描述中的例子并行度设置为1
env.setParallelism(1)
输出结果可能为:
(BOB,1)
(ALICE,1)
(ALICE,2)
(LILY,1)
(LILY,2)
(LILY,3)
Flink 1.14.2 中调试
Flink 1.14.2 中执行sum
算子的是BatchGroupedReduceOperator
。和StreamGroupedReduceOperator
设计上不同的地方是,BatchGroupedReduceOperator
只会输出最终的结果,不输出中间结果。
我们查看它的processElement
方法:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// 获取当前元素的值
IN value = element.getValue();
// 获取当前缓存的值(上次计算的结果)
// 注意,这个缓存的值的key和当前元素的key是一致的
IN currentValue = values.value();
// 如果缓存值为null,说明之前没有处理过和当前元素对应key相同的其他元素
// 注册一个EventTime定时器,触发时刻为Long.MAX_VALUE(后面解释)
// 该定时器负责触发和当前元素对应key相同的一系列元素
// 每个不同的key对应一个timer
// 这样key相同的数据计算结果只输出一个,这是和stream模式不同的地方
if (currentValue == null) {
// register a timer for emitting the result at the end when this is the
// first input for this key
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.MAX_VALUE);
} else {
// otherwise, reduce things
// 否则,只进行reduce计算(例子中为求和)
value = userFunction.reduce(currentValue, value);
}
// 更新缓存的值为当前计算结果
values.update(value);
}
通过上面分析可知,只在第一次接收到key为某个值的元素的时候,才会注册一个event time timer,触发时间为Long.MAX_VALUE
。确保了key相同的元素只在最后输出一次(不像是Streaming模式,每次接收到数据都会输出)。
接下来需要分析这个timer注册之后是怎么触发的。我们从OneInputStreamTask
开始分析。
public void emitRecord(StreamRecord<IN> record) throws Exception {
// 流入数据计数器加一
this.numRecordsIn.inc();
// 调用operator的setKeyContextElement方法
// 需要从record中抽取出key,然后传给state backend
// 对于keyedStateBackend,每个key对应的缓存值是不同的
this.operator.setKeyContextElement(record);
// 处理这个record
this.operator.processElement(record);
}
接着调用的是OneInputStreamOperator
的setKeyContextElement
方法:
@Override
default void setKeyContextElement(StreamRecord<IN> record) throws Exception {
setKeyContextElement1(record);
}
继续跟踪到AbstractStreamOperator
:
public void setKeyContextElement1(StreamRecord record) throws Exception {
setKeyContextElement(record, stateKeySelector1);
}
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)
throws Exception {
if (selector != null) {
// 从record中抽取出key
Object key = selector.getKey(record.getValue());
// 调用setCurrentKey
setCurrentKey(key);
}
}
public void setCurrentKey(Object key) {
// 调用stateHandler
// StreamOperatorStateHandler是各种各样state backend的封装
stateHandler.setCurrentKey(key);
}
接下来是StreamOperatorStateHandler
的setCurrentKey
方法:
public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
@SuppressWarnings("rawtypes")
CheckpointableKeyedStateBackend rawBackend = keyedStateBackend;
// 调用原始状态后端的setCurrentKey方法
rawBackend.setCurrentKey(key);
} catch (Exception e) {
throw new RuntimeException(
"Exception occurred while setting the current key context.", e);
}
}
}
经过debug我们发现这里rawBackend
实际上是BatchExecutionKeyedStateBackend
。我们查看它的setCurrentKey
代码:
@Override
public void setCurrentKey(K newKey) {
// 如果新的key值和之前处理的key值不同,才进行后面的逻辑
if (!Objects.equals(newKey, currentKey)) {
// 通知key变更
notifyKeySelected(newKey);
// 清除状态后端的值,因为之前的已经计算过了
for (State value : states.values()) {
((AbstractBatchExecutionKeyState<?, ?, ?>) value).clearAllNamespaces();
}
// 清除key对应的所有内容
for (KeyGroupedInternalPriorityQueue<?> value : priorityQueues.values()) {
while (value.poll() != null) {
// remove everything for the key
}
}
// 设置当前key为新的key
this.currentKey = newKey;
}
}
通过分析可知,只有key改变的时候才会触发notifyKeySelected
,这是一个关键点。接下来我们探究notifyKeySelected
方法做了什么事。
private void notifyKeySelected(K newKey) {
// we prefer a for-loop over other iteration schemes for performance reasons here.
for (KeySelectionListener<K> keySelectionListener : keySelectionListeners) {
keySelectionListener.keySelected(newKey);
}
}
这个方法通知所有的keySelectionListener
。通过debug,keySelectionListeners
只有一个,为BatchExecutionInternalTimeServiceManager
。我们查看它的keySelected
方法:
@Override
public void keySelected(K newKey) {
try {
for (BatchExecutionInternalTimeService<K, ?> value : timerServices.values()) {
value.setCurrentKey(newKey);
}
} catch (Exception e) {
throw new WrappingRuntimeException(e);
}
}
它挨个调用所有BatchExecutionInternalTimeService
的setCurrentKey
方法。debug时候timerServices
只有一个对象。接下来需要分析的目标就明确了。查看BatchExecutionInternalTimeService
的setCurrentKey
方法,代码如下:
public void setCurrentKey(K currentKey) throws Exception {
// 如果当前key(上次处理的key)为null,继续下面逻辑
// 当前key不为null并且参数中的key和当前key不相同,继续下面逻辑
// 其他情况,直接返回
if (currentKey != null && currentKey.equals(this.currentKey)) {
return;
}
// 设置当前watermark为Long.MAX_VALUE
currentWatermark = Long.MAX_VALUE;
InternalTimer<K, N> timer;
// 调用所有eventTime定时器
while ((timer = eventTimeTimersQueue.poll()) != null) {
// 触发定时器目标的onEventTime方法
triggerTarget.onEventTime(timer);
}
// 调用所有processingTime定时器
// 这里并没有检查timer的触发时间(通过前面分析可知timer的触发时间为Long.MAX_VALUE)
while ((timer = processingTimeTimersQueue.poll()) != null) {
// 触发定时器目标的onProcessingTime方法
triggerTarget.onProcessingTime(timer);
}
// 设置当前watermark为Long.MIN_VALUE
currentWatermark = Long.MIN_VALUE;
// 更新当前key
this.currentKey = currentKey;
}
这里的triggerTarget
为最早提到的BatchGroupedReduceOperator
。我们看看onEventTime
方法做了什么事情。
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 获取缓存的value
IN currentValue = values.value();
if (currentValue != null) {
// 如果缓存的value不为null,输出这个value到下游
output.collect(new StreamRecord<>(currentValue, Long.MAX_VALUE));
}
}
流程走到这里,下游才能够接收到数据。但条件是接收到一个key不同的数据。分析到这里问题就显而易见了:最后一种不同key的数据永远没有机会输出。这就是数据丢失的原因。
修复方式
修复的方式其实很明确,在数据输入结束的时候,强制将缓存的value输出就可以了。我们需要在数据输入结束的时候通知BatchGroupedReduceOperator
,然后调用onEventTime
方法就可以了。问题来了,如何在输入结束的时候得到通知?
我们需要用到BoundedOneInput
接口。它用于单个输入的operator(例如OneInputStreamOperator
),在数据输入结束的时候endInput
方法会被调用。
@PublicEvolving
public interface BoundedOneInput {
void endInput() throws Exception;
}
我们修改BatchGroupedReduceOperator
类定义,实现BoundedOneInput
接口,然后在endInput
中调用onEventTime
方法,即可完成修复。
@Internal
public class BatchGroupedReduceOperator<IN, KEY>
extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
implements OneInputStreamOperator<IN, IN>,
Triggerable<KEY, VoidNamespace>,
BoundedOneInput {
// ...
@Override
public void endInput() throws Exception {
onEventTime(null);
}
}
我们可以将修改完的代码重新编译,用输出的jar替换项目中的之后重新运行,无论怎么改变并行度,发现输出均正常。
问题是解决了,我们顺便分析下为什么BoundedOntInput
的endInput
方法会在数据输入结束的时候得到调用。
我们从StreamTask
的processInput
方法开始分析。processInput
方法负责处理输入的数据。
protected void processInput(Controller controller) throws Exception {
DataInputStatus status = this.inputProcessor.processInput();
// ...
}
这里调用的inputProcessor
的processInput
方法,代码如下:
public DataInputStatus processInput() throws Exception {
// input发送一个数据到output,获取数据发送状态
DataInputStatus status = this.input.emitNext(this.output);
// 如果发送状态为END_OF_DATA,表示数据输入结束
if (status == DataInputStatus.END_OF_DATA) {
// 调用endOfInputAware的endInput方法
this.endOfInputAware.endInput(this.input.getInputIndex() + 1);
this.output = new FinishedDataOutput();
} else if (status == DataInputStatus.END_OF_RECOVERY) {
if (this.input instanceof RecoverableStreamTaskInput) {
this.input = ((RecoverableStreamTaskInput)this.input).finishRecovery();
}
return DataInputStatus.MORE_AVAILABLE;
}
return status;
}
经过debug我们知道endOfInputAware
为RegularOperatorChain
。我们查看它的endInput
方法:
public void endInput(int inputId) throws Exception {
if (this.mainOperatorWrapper != null) {
this.mainOperatorWrapper.endOperatorInput(inputId);
}
}
它调用了operator包装器的endOperatorInput
方法。继续跟踪。
public void endOperatorInput(int inputId) throws Exception {
if (this.wrapped instanceof BoundedOneInput) {
((BoundedOneInput)this.wrapped).endInput();
} else if (this.wrapped instanceof BoundedMultiInput) {
((BoundedMultiInput)this.wrapped).endInput(inputId);
}
}
上面方法中的wrapped
正是BatchGroupedReduceOperator
。这个方法会检测被包装的operator是否实现了BoundedOneInput
接口,如果实现了就调用endInput
方法。
上面的分析为数据输入结束后operator得到通知的调用逻辑。
即便问题已经解决,然而问题的根因真的是这样吗?下一章我们继续分析。
进一步分析
这一节进行进一步调查。经过社区大佬的提醒,我们使用纯DataStream API编写同样的程序,如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.setParallelism(3)
val tableEnv = StreamTableEnvironment.create(env)
val sourceStream = env.fromElements(
"Alice",
"Alice",
"lily",
"Bob",
"lily",
"lily"
)
val counts: DataStream[(String, Int)] = sourceStream
.map((_, 1))
.keyBy(_._1)
.sum(1)
counts.print()
env.execute("WordCount")
执行这个程序,我们惊奇的发现,结果是正常的。问题真实触发的原因是数据流从Table API转换为DataStream API的时候才会有问题。从这里我们推断,问题也许出现在上游Table处理相关的operator中。
使用纯DataStream API的时候结果是正常的,说明keySelected
方法在程序结束的时候的确得到了调用。在继续分析Table相关operator前我们温热下BatchExecutionInternalTimeServiceManager
,看看它哪里还会调用到keySelected
方法。果然我们找到了advanceWatermark
方法,如下所示:
@Override
public void advanceWatermark(Watermark watermark) {
if (watermark.getTimestamp() == Long.MAX_VALUE) {
keySelected(null);
}
}
它检查watermark
的timestamp
,如果为Long.MAX_VALUE
调用keySelected(null)
。通过前面的分析我们已经知道:只在第一次接收到key为某个值的元素的时候,才会注册一个event time timer,触发时间为Long.MAX_VALUE
。所以我们猜测,纯DataStream API正常情况下在程序结束的时候会调用advanceWatermark
方法。
我们找到StreamTask
的endData
方法,这个方法在输入数据结束的时候调用,它又间接调用了advanceToEndOfEventTime
,如下所示。
protected void endData(StopMode mode) throws Exception {
if (mode == StopMode.DRAIN) {
advanceToEndOfEventTime();
}
// finish all operators in a chain effect way
operatorChain.finishOperators(actionExecutor, mode);
this.finishedOperators = true;
for (ResultPartitionWriter partitionWriter : getEnvironment().getAllWriters()) {
partitionWriter.notifyEndOfData(mode);
}
this.endOfDataReceived = true;
}
我们继续查看advanceToEndOfEventTime
方法。这个方法的实现在它的子类中。我们查看下子类SourceStreamTask
的advanceToEndOfEventTime
方法:
@Override
protected void advanceToEndOfEventTime() throws Exception {
operatorChain.getMainOperatorOutput().emitWatermark(Watermark.MAX_WATERMARK);
}
发现它为下游算子发送了一个时间戳为Long.MAX_VALUE
的watermark
。这点就和上面为什么纯DataStream API下能够正常运行对应上了。正常情况下Flink作业结束的时候,上游会发送一个时间戳为Long.MAX_VALUE
的watermark
到下游。
接下来我们开始debug Table API 向DataStream API转换的这种异常情形。我们发现,它的上游operator为InputConversionOperator
。它应该也有一个processWatermark
方法。查看代码如下所示:
@Override
public void processWatermark(Watermark mark) throws Exception {
if (propagateWatermark) {
super.processWatermark(mark);
}
}
发现如果propagateWatermark
变量为false,watermark会被忽略。经过debug发现propagateWatermark真的为false。这是才是数据丢失的根本原因。
综上分析,我们修改代码如下:
@Override
public void processWatermark(Watermark mark) throws Exception {
if (propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
super.processWatermark(mark);
}
}
确保它一定会将时间戳为Long.MAX_VALUE
的watermark
发往下游。修改完毕后重新编译测试,问题成功解决。目前该问题修复已被社区采纳。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。