Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
背景
Unaligned Checkpoint是Flink 1.11 新增的功能。在Flink之前的版本,checkpoint的对齐操作会使先收到barrier的input channel后续到来的数据缓存起来,一直等到所有的input channel都接收到chechkpoint barrier并且checkpoint操作完毕后,才放开数据进入operator。这样虽然保证了exactly-once,但是显著的增加了延迟,降低了性能。如果再遇到数据反压,情况会更加糟糕。
Unaligned Checkpoint的引入解决了传统Aligned Checkpoint同时数据高反压的场景下,一条数据流延迟高会影响到另一个数据流的问题。Unaligned checkpoint改变了过去checkpoint的逻辑。主要有以下几点:
- 如果有一个input channel接收到barrier,开始checkpoint过程,并记录下checkpoint id。
- 在operator输出缓存头部(最先出缓存的位置)中插入一个新的checkpoint barrier,用于向下游广播。
- 从各个input channel读取数据buffer写入到checkpoint,直到读取到checkpoint id为先前记录的id的barrier。(1)中的input channel由于已经读取到barrier了,它之后的数据不会被记录到checkpoint中。
- Aligned checkpoint在所有input channel接收到barrier候触发,unaligned checkpoint在任何一个input channel接收到第一个barrier时触发。
- Unaligned checkpoint不会阻塞任何input channel。
以上步骤用Flink官网的图描述如下:
其中黄色部分的数据需要写入到checkpoint中,包含输入端所有channel的checkpoint barrier之后的数据buffer,operator内部的状态和输出端buffer。
本篇围绕代码部分分析,关于Unaligned checkpoint特点部分不做过多的介绍。更为详细的解读请参考:https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
中文版的分析请参考:https://developer.aliyun.com/article/768710
关于Checkpoint全过程的分析请参考博客:Flink 源码之快照
Flink 1.10之前版本的Checkpoint barrier,barrier对齐操作和非对齐操作(无法保证exactly-once)相关的分析请参考博客:Flink 源码之分布式快照
源代码解析
Unaligned Checkpoint的逻辑主要在CheckpointBarrierUnaligner
和SubtaskCheckpointCoordinatorImpl
中。下面我们从配置的读取过程开始,分析Unaligned Checkpoint的实现原理。
InputProcessorUtil
CheckpointBarrierUnaligner
在InputProcessorUtil
的createCheckpointBarrierHandler
方法被创建出。InputProcessorUtil
负责为InputProcessor
创建CheckpointedInputGate
。
创建barrier handler的代码逻辑和解析如下所示:
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
InputGate[] inputGates,
SubtaskCheckpointCoordinator checkpointCoordinator,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
// 读取配置中的checkpoint模式
switch (config.getCheckpointMode()) {
// 如果是exactly once模式
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
// 如果启用的unaligned checkpoint,则创建一个AlternatingCheckpointBarrierHandler
// AlternatingCheckpointBarrierHandler为一个组合类型
// 可以根据checkpoint barrier类型(checkpoint或savepoint),选择使用对应的CheckpointBarrierHandler
// 具体在后面分析
return new AlternatingCheckpointBarrierHandler(
new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates),
new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, toNotifyOnCheckpoint, inputGates),
toNotifyOnCheckpoint);
}
// 如果没有启用unaligned checkpoint,则返回CheckpointBarrierAligner
return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, inputGates);
case AT_LEAST_ONCE:
// 如果是at least once模式并启用了unaligned checkpoint,会抛出异常,不支持这种场景
if (config.isUnalignedCheckpointsEnabled()) {
throw new IllegalStateException("Cannot use unaligned checkpoints with AT_LEAST_ONCE " +
"checkpointing mode");
}
// 计算出所有inputGate包含的inputChannel总数
int numInputChannels = Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
// 创建一个CheckpointBarrierTracker,用于老版本的非对齐checkpoint,无法保证精准一次
return new CheckpointBarrierTracker(numInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}
AlternatingCheckpointBarrierHandler
该类的功能我们在上一段代码分析中已经解释过了。接下来我们看一下它是怎么处理CheckpointBarrier
的。我们查看下processBarrier
方法,如下所示:
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws Exception {
// 如果接受到的barrier的id比上一个接收到的barrier id更小,说明这个barrier迟到,会被忽略
if (receivedBarrier.getId() < lastSeenBarrierId) {
return;
}
// 更新上一次接收到的barrier id
lastSeenBarrierId = receivedBarrier.getId();
// 获取上一个hander
CheckpointBarrierHandler previousHandler = activeHandler;
// 如果接收到的checkpointBarrier类型为checkpoint(另一种类型为savepoint),使用非对齐handler
activeHandler = receivedBarrier.isCheckpoint() ? unalignedHandler : alignedHandler;
// 如果上一个handler和当前handler不同,说明遇到了不同类型的barrier,则终止上一个handler正在进行的checkpoint操作
if (previousHandler != activeHandler) {
previousHandler.abortPendingCheckpoint(
lastSeenBarrierId,
new CheckpointException(format("checkpoint subsumed by %d", lastSeenBarrierId), CHECKPOINT_DECLINED_SUBSUMED));
}
// 调用activeHandler(unalignedHandler)的processBarrier方法
activeHandler.processBarrier(receivedBarrier, channelInfo);
}
CheckpointBarrierUnaligner
CheckpointBarrierUnaligner
负责记录各个input channel接收checkpoint barrier的状态,在接收到第一个barrier的时候触发checkpoint操作。
各个input channel记录的接收barrier状态在hasInflightBuffers
变量,定义如下:
private final Map<InputChannelInfo, Boolean> hasInflightBuffers;
该变量用于存储每个input channel是否有inflight buffer。
什么是inflight buffer?Inflight buffer指的是input channel的receivedBuffer范围内,在id为当前checkpoint id的barrier之后的所有数据buffer
CheckpointedInputGate
每次接收到一个barrier都会调用CheckpointBarrierUnaligner
的processBarrier
方法。该方法代码如下:
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, InputChannelInfo channelInfo) throws Exception {
// 获取接收到的barrier id
long barrierId = receivedBarrier.getId();
// 忽略掉旧的或取消了的checkpoint的barrier
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
return;
}
// 如果当前进行的checkpoint id小于接收到的barrier id,说明需要开始处理新的checkpoint
if (currentConsumedCheckpointId < barrierId) {
// 更新当前进行的checkpoint
currentConsumedCheckpointId = barrierId;
// 重置接收的barrier数量为0
numBarrierConsumed = 0;
// hasInflightBuffers为一个map,保存了每个channel是否有inflight buffer
// inflight buffer的含义为在当前checkpoint对应的barrier之前接收到的buffer
// 这里设置所有的input channel都有inflight buffer
hasInflightBuffers.entrySet().forEach(hasInflightBuffer -> hasInflightBuffer.setValue(true));
}
// 如果当前进行的checkpoint id等于接收到的barrier id,说明channel中的inflight buffer已经处理完毕
// (第一个收到某特定checkpoint的barrier的input channel,会在这里把状态设置为false)
if (currentConsumedCheckpointId == barrierId) {
// 设置该channel不再有inflight buffer
hasInflightBuffers.put(channelInfo, false);
// 增加接收到的barrier数量
numBarrierConsumed++;
}
// 调用threadSafeUnaligner.notifyBarrierReceived
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfo);
}
这段代码的主要作用是实时统计各个input channel是否还有inflight buffer。
在方法最后调用了threadSafeUnaligner
的notifyBarrierReceived
。在这之前我们需要先熟悉下threadSafeUnaligner
的部分重要成员变量。
threadSafeUnaligner
一些成员变量的定义和解释如下:
/**
* Tag the state of which input channel has not received the barrier, such that newly arriving buffers need
* to be written in the unaligned checkpoint.
*/
// 用来表示channel是否没有接收到barrier,true表示没有接收到
private final Map<InputChannelInfo, Boolean> storeNewBuffers;
/** The number of input channels which has received or processed the barrier. */
// 记录接收到多少个barrier
private int numBarriersReceived;
/** A future indicating that all barriers of the a given checkpoint have been read. */
// 如果所有的input channel都接收到了barrier,这个CompletableFuture会complete,其他使用whenComplete等待该变量状态变化的地方
private CompletableFuture<Void> allBarriersReceivedFuture = FutureUtils.completedVoidFuture();
现在,我们看一下notifyBarrierReceived
方法的代码:
@Override
public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
long barrierId = barrier.getId();
// 需要处理新的checkpoint
if (currentReceivedCheckpointId < barrierId) {
// 处理新的checkpoint,下面分析
handleNewCheckpoint(barrier);
// 在task的线程,调用handler的notifyCheckpoint,通知checkpoint开始
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier), "notifyCheckpoint");
}
// 如果当前进行的checkpoint id等于接收到的barrier id
// 并且该channel没有接收到barrier
if (barrierId == currentReceivedCheckpointId && storeNewBuffers.get(channelInfo)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelInfo, barrierId);
}
// 修改状态为该channel已接收到buffer
storeNewBuffers.put(channelInfo, false);
// 已接收到barrier计数自增
// 如果和channel数相等,allBarriersReceivedFuture调用complete方法,标记所有的input channel都接收到了barrier
if (++numBarriersReceived == numOpenChannels) {
allBarriersReceivedFuture.complete(null);
}
}
}
这里面调用了handleNewCheckpoint
,我们展开分析下:
private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
long barrierId = barrier.getId();
// 如果上一轮checkpoint还有input channel没有收到barrier,表明checkpoint过程异常
if (!allBarriersReceivedFuture.isDone()) {
// 创建异常
CheckpointException exception = new CheckpointException("Barrier id: " + barrierId, CHECKPOINT_DECLINED_SUBSUMED);
// 如果checkpoint正在进行,终止checkpoint过程并通知handler checkpoint过程终止
if (isCheckpointPending()) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
handler.taskName,
barrierId,
currentReceivedCheckpointId);
// let the task know we are not completing this
final long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() -> handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
}
// allBarriersReceivedFuture带异常状态完成
allBarriersReceivedFuture.completeExceptionally(exception);
}
// 标记checkpoint过程开始
handler.markCheckpointStart(barrier.getTimestamp());
// 更新当前的checkpoint id
currentReceivedCheckpointId = barrierId;
// 初始化storeNewBuffers,设置所有的input channel都没有收到barrier
storeNewBuffers.entrySet().forEach(storeNewBuffer -> storeNewBuffer.setValue(true));
// 重置接收到的barrier计数器
numBarriersReceived = 0;
// 重新创建一个allBarriersReceivedFuture
allBarriersReceivedFuture = new CompletableFuture<>();
// 告诉checkpoint协调器,初始化一个checkpoint,checkpoint过程开始
checkpointCoordinator.initCheckpoint(barrierId, barrier.getCheckpointOptions());
}
对于checkpointCoordinator
我们专门在下个章节分析。因为initCheckpoint
方法的逻辑不多,提前在这里分析一下。
checkpointCoordinator
的实现类为SubtaskCheckpointCoordinatorImpl
,我们查看下它的initCheckpoint
方法。
@Override
public void initCheckpoint(long id, CheckpointOptions checkpointOptions) {
// 如果启用了unaligned checkpoint,调用ChannelStateWriter的start方法
if (checkpointOptions.isUnalignedCheckpoint()) {
channelStateWriter.start(id, checkpointOptions);
}
}
ChannelStateWriter
负责在checkpoint或者是savepoint过程中异步写入channel状态,正是这个类负责把各个input channel的inflight buffer和operator的输出缓存(ResultSubpartition)的内容记录到checkpoint。
ChannelStateWriter
的start
的作用为开始记录一个新的checkpoint的内容,初始化写入状态。ChannelStateWriter
实现比较复杂,本篇暂不分析。
ThreadSafeUnaligner
继承了BufferReceivedListener
接口,因此有一个notifyBufferReceived
方法,该方法在RemoteInputChannel
的onBuffer
方法中调用:
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;
try {
// ...
if (notifyReceivedBarrier != null) {
receivedCheckpointId = notifyReceivedBarrier.getId();
if (notifyReceivedBarrier.isCheckpoint()) {
listener.notifyBarrierReceived(notifyReceivedBarrier, channelInfo);
}
} else if (notifyReceivedBuffer != null) {
// 此处调用了notifyBufferReceived
listener.notifyBufferReceived(notifyReceivedBuffer, channelInfo);
}
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
意思是每当input channel接收到一个数据块,就会调用监听器的notifyBufferReceived
方法。ThreadSafeUnaligner
正是一个监听器。
接下来分析下该方法的源代码:
@Override
public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) {
// 如果这个channel没有接收到checkpoint barrier
// 即将input channel中checkpoint barrier之后的数据都写入checkpoint中
if (storeNewBuffers.get(channelInfo)) {
// 使用ChannelStateWriter写入buffer内容
// addInputData方法将buffer加入到ChannelStateWriter中,等待稍后写入到checkpoint
checkpointCoordinator.getChannelStateWriter().addInputData(
currentReceivedCheckpointId,
channelInfo,
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
ofElement(buffer, Buffer::recycleBuffer));
} else {
buffer.recycleBuffer();
}
}
SubtaskCheckpointCoordinatorImpl
该类是checkpoint流程的总协调器,和CheckpointCoordinator
不同的是,它只负责本篇Flink新特性的Unaligned checkpoint相关的协调工作。
SubtaskCheckpointCoordinatorImpl
中完整的checkpoint执行过程在checkpointState
方法。如下所示:
@Override
public void checkpointState(
CheckpointMetaData metadata,
CheckpointOptions options,
CheckpointMetrics metrics,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
checkNotNull(options);
checkNotNull(metrics);
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// 检查上一个checkpoint的 id是否比metadata的checkpoint id小
// 否则存在checkpoint barrier乱序的可能,终止掉metadata.getCheckpointId()对应的checkpoint操作
if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}
// 下面开始正式的checkpoint流程
// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
// 第0步,更新lastCheckpointId变量
// 如果当前checkpoint被取消,广播CancelCheckpointMarker到下游,表明这个checkpoint被终止
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
// broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
// 第1步,调用operatorChain的prepareSnapshotPreBarrier方法,执行checkpoint操作前的预处理逻辑
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
// Step (2): Send the checkpoint barrier downstream
// 第2步,向下游广播CheckpointBarrier
// 这里有个关键点:第二个参数为isPriorityEvent,连续跟踪代码后发现调用的是PipelinedSubpartition中的add方法,
// 如果isPriorityEvent为true,表示把这个barrier插入到ResultSubpartition的头部
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input and output
// 第3步是一个关键点,如果启用了unaligned checkpoint,将所有input channel中checkpoint barrier后的buffer写入到checkpoint中
// 这个方法稍后分析
if (options.isUnalignedCheckpoint()) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the
// streaming topology
// 第4步,异步执行checkpoint操作,checkpoint数据落地
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
finishAndReportAsync(snapshotFutures, metadata, metrics, options);
} else {
cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
}
Operator输出buffer写入到checkpoint的调用过程在SubtaskCheckpointCoordinatorImpl
的prepareInflightDataSnapshot
方法,代码如下:
private void prepareInflightDataSnapshot(long checkpointId) throws IOException {
// 获取ResultPartitionWriter
ResultPartitionWriter[] writers = env.getAllWriters();
for (ResultPartitionWriter writer : writers) {
for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) {
// 遍历每个writer的ResultSubpartition
ResultSubpartition subpartition = writer.getSubpartition(i);
// 把所有subpartition中的数据加入到channelStateWriter中,等待稍后写入到checkpoint
channelStateWriter.addOutputData(
checkpointId,
subpartition.getSubpartitionInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0]));
}
}
// 调用finishOutput方法表明完成了所有的addOutputData操作
channelStateWriter.finishOutput(checkpointId);
// 这里返回一个future,所有的input channel是否都收到了barrier,如果有channel没有收到barrier,下面的ex不为null
// apply调用的是StreamTask的prepareInputSnapshot方法
// prepareInputSnapshot在StreamTask类创建subtaskCheckpointCoordinator时被初始化
prepareInputSnapshot.apply(channelStateWriter, checkpointId)
.whenComplete((unused, ex) -> {
if (ex != null) {
// complete时如果存在input channel没有收到barrier,则调用abort(终止)方法
channelStateWriter.abort(checkpointId, ex, false /* result is needed and cleaned by getWriteResult */);
} else {
// complete时如果所有的input channel都收到了barrier, 调用finishInput方法
channelStateWriter.finishInput(checkpointId);
}
});
}
下面详细说一下prepareInputSnapshot.apply(channelStateWriter, checkpointId)
调用。这里实际调用的是StreamTask
的prepareInputSnapshot
方法,如下所示:
private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
if (inputProcessor == null) {
return FutureUtils.completedVoidFuture();
}
// InputProcessor准备状态快照操作
return inputProcessor.prepareSnapshot(channelStateWriter, checkpointId);
}
这里调用了inputProcessor
的prepareSnapshot
方法。inputProcessor
有多个实现类,例如StreamOneInputProcessor
的prepareSnapshot
方法如下:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
// 又调用了StreamTaskInput的prepareSnapShot方法
return input.prepareSnapshot(channelStateWriter, checkpointId);
}
对于非对齐checkpoint场景下更具有代表性的多输出流场景,我们查看下StreamTwoInputProcessor
的prepareSnapshot
方法:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
return CompletableFuture.allOf(
input1.prepareSnapshot(channelStateWriter, checkpointId),
input2.prepareSnapshot(channelStateWriter, checkpointId));
}
我们发现它返回一个CompletionFuture
,只有两个input的prepareSnapshot
方法都执行完毕后才会complete。
对于input的prepareSnapshot
,我们查看下StreamTaskNetworkInput
的prepareSnapshot
方法,代码如下:
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
// 遍历所有的record反序列化器
// 将反序列化器中尚未消费完的buffer存入checkpoint
for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
final InputChannel channel = checkpointedInputGate.getChannel(channelIndex);
// Assumption for retrieving buffers = one concurrent checkpoint
RecordDeserializer<?> deserializer = recordDeserializers[channelIndex];
if (deserializer != null) {
// 将反序列化器中为消费完的buffer写入channelStateWriter
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
deserializer.getUnconsumedBuffer());
}
// 将所有channel的inflight buffer写入checkpoint
checkpointedInputGate.spillInflightBuffers(checkpointId, channelIndex, channelStateWriter);
}
// 返回一个CompletableFuture,表示是否所有的input channel都接收到了barrier
return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId);
}
我们继续查看CheckpointedInputGate
的spillInflightBuffers
方法。如下所示:
public void spillInflightBuffers(
long checkpointId,
int channelIndex,
ChannelStateWriter channelStateWriter) throws IOException {
InputChannel channel = inputGate.getChannel(channelIndex);
// 判断该channel是否有inflight buffer,如果有,执行channel的spillInflightBuffers方法
if (barrierHandler.hasInflightData(checkpointId, channel.getChannelInfo())) {
channel.spillInflightBuffers(checkpointId, channelStateWriter);
}
}
接下来分析RemoteInputChannel
的spillInflightBuffers
方法:
@Override
public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
synchronized (receivedBuffers) {
checkState(checkpointId > lastRequestedCheckpointId, "Need to request the next checkpointId");
final List<Buffer> inflightBuffers = new ArrayList<>(receivedBuffers.size());
// 遍历所有已接收的buffer
for (Buffer buffer : receivedBuffers) {
// 解析buffer承载的是数据还是event,如果是event是否是CheckpointBarrier类型,除了Checkpoint类型外一律返回null
CheckpointBarrier checkpointBarrier = parseCheckpointBarrierOrNull(buffer);
// 如果这个if成立,说明找到当前或下一个checkpoint barrier,添加inflightBuffers操作停止
if (checkpointBarrier != null && checkpointBarrier.getId() >= checkpointId) {
break;
}
// 如果buffer承载的是数据,则添加到inflightBuffers集合中
if (buffer.isBuffer()) {
inflightBuffers.add(buffer.retainBuffer());
}
}
// 更新lastRequestedCheckpointId,防止重复操作
lastRequestedCheckpointId = checkpointId;
// 将inflightBuffers中数据写入ChannelStateWriter中
channelStateWriter.addInputData(
checkpointId,
channelInfo,
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
CloseableIterator.fromList(inflightBuffers, Buffer::recycleBuffer));
}
}
本人分析到这个位置的时候有个疑问:RemoteInputChannel
有spillInflightBuffers
方法,可以将所有input channel的inflight buffer写入checkpoint,那么为何还需要ThreadSafeUnaligner
的notifyBufferReceived
,每个buffer到来的时候都立刻写入checkpoint?
本人思考下觉得应该是这么个理由,如果有错误希望各位读者纠正:
如果checkpoint时,barrier还没有进入某个input channel的缓存,首先调用spillInflightBuffers
将该channel所有的buffer写入checkpoint。那么后续到来的数据buffer交给notifyBufferReceived
方法写入checkpoint,直到barrier进入到input缓存中为止。
takeSnapshotSync
方法,负责写入checkpoint数据。方法内容如下:
private boolean takeSnapshotSync(
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
Supplier<Boolean> isCanceled) throws Exception {
// 遍历所有的operator,如果发现有operator已关闭,拒绝执行checkpoint
for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
if (operatorWrapper.isClosed()) {
env.declineCheckpoint(checkpointMetaData.getCheckpointId(),
new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
return false;
}
}
long checkpointId = checkpointMetaData.getCheckpointId();
long started = System.nanoTime();
// 从channelStateWriter获取channelStateWriteResult
ChannelStateWriteResult channelStateWriteResult = checkpointOptions.isUnalignedCheckpoint() ?
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
// 根据配置项中的checkpoint目标路径创建用于持久化checkpoint数据的CheckpointStreamFactory
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
// 触发持久化checkpoint操作
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
if (!operatorWrapper.isClosed()) {
operatorSnapshotsInProgress.put(
operatorWrapper.getStreamOperator().getOperatorID(),
buildOperatorSnapshotFutures(
checkpointMetaData,
checkpointOptions,
operatorChain,
operatorWrapper.getStreamOperator(),
isCanceled,
channelStateWriteResult,
storage));
}
}
} finally {
checkpointStorage.clearCacheFor(checkpointId);
}
LOG.debug(
"{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms",
taskName,
checkpointId,
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
return true;
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。