Flink源码- 分布式快照
Flink 分布式快照核心概念之一, Barrier会被注入数据流,作为数据流得一部分向下流动,barrier用于不会超过其他数据,也不会被其他数据超过,数据流严格有序,barrier自带快照的id,用来标识它所有的快照,而基于barrier之后的数据导致的所有更改,就会包含在之后的检查点中,在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。
1 checkpoint Coordinator 向所有的source节点 触发checkpoints
2 source节点向下游广播barrier,这个 barrier就是 实现chandy-lamport 分布式快照算法的核心 , 下游的Task只有接收到所有input的barrier才会执行checkpoint
3 当task完成state备份后,会将备份数据的地址 (state handle) 通知给checkpoint coordinator
4 下游sink节点收到上游两个inupt的barrier之后,会执行本地快照,这里 展示了RocksDB incremental checkpoint的流程, 首先RocksDB会全量数据刷到磁盘上(红色大三角) , 然后Flink框架 会从中选择没有上传的文件进行持久化备份(紫色小三角)
5 同样,当sink节点完成自己checkpoint之后,会将state handle返回 通知给coordinator
6 当checkpoint coordinator 收集所有task的 state handle, 就认为这一次的checkpoint全局完成了, 向持久化存储中在备份一个checkpoint meta文件
Checkpoint 的 EXACTLY_ONCE 语义
为了实现 exactly-once 语义, Flink通过 一个input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成后在进行处理, 对于at least oncey语义, 无需缓存收集到的数据, 会对后续直接进行处理, 所以 当导致restore时, 数据可能导致多次处理
需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。
CheckpointBarrier
id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。
timestamp:记录checkpoint barrier产生的时间。ScheduledTrigger的run方法转入了系统当前时间为checkpoint的timestamp。
checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。
public class CheckpointBarrier extends RuntimeEvent {
private final long id;
private final long timestamp;
private final CheckpointOptions checkpointOptions;
public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
this.timestamp = timestamp;
this.checkpointOptions = checkNotNull(checkpointOptions);
}
什么时候Barrier向下游传播
OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
StreamTask.performCheckpoint 方法中调用subtaskCheckpointCoordinator.checkpointState方法,在该方法中调用operatorChain.broadcastEvent进行广播
//StreamTask.performCheckpoint方法中
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isCanceled);
//subtaskCheckpointCoordinator.checkpointState方法中
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
unalignedCheckpointEnabled);
//OperatorChain.broadcastEvent方法
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(event, isPriorityEvent);
}
}
InputProcessorUtil类
这个类中有一个重要的方法 CheckpointBarrierHandler方法, 这个方法会根据用户指定的语义创建不同的barrier处理器
在CheckpointBarrierHandler方法中判断语义,如果是EXACTLY_ONCE语义,并且未启用对齐的检查点创建CheckpointBarrierUnaligner否则创建CheckpointBarrierAligner,如果AT_LEAST_ONCE使用CheckpointBarrierTracker,三个类都实现了CheckpointBarrierHandler接口,主要负责barrier到来时候的处理逻辑,主要目的是为了barrier对齐的问题
调用链
StreamTask.beforeInvoke
protected void beforeInvoke() throws Exception {
...
// task specific initialization
init();
...
}
OneInputStreamTask.init.
public void init() throws Exception {
...
if (numberOfInputs > 0) {
//这里创建
CheckpointedInputGate inputGate = createCheckpointedInputGate();
DataOutput<IN> output = createDataOutput();
StreamTaskInput<IN> input = createTaskInput(inputGate, output);
inputProcessor = new StreamOneInputProcessor<>(
input,
output,
operatorChain);
}
...
}
OneInputStreamTask.createCheckpointedInputGate
private CheckpointedInputGate createCheckpointedInputGate() {
IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
return InputProcessorUtil.createCheckpointedInputGate(
this,
configuration,
getChannelStateWriter(),
inputGate,
getEnvironment().getMetricGroup().getIOMetricGroup(),
getTaskNameWithSubtaskAndId());
}
InputProcessorUtil.createCheckpointedInputGate
public static CheckpointedInputGate createCheckpointedInputGate(
AbstractInvokable toNotifyOnCheckpoint,
StreamConfig config,
ChannelStateWriter channelStateWriter,
InputGate inputGate,
TaskIOMetricGroup taskIOMetricGroup,
String taskName) {
//这里创建
CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
config,
IntStream.of(inputGate.getNumberOfInputChannels()),
channelStateWriter,
taskName,
generateChannelIndexToInputGateMap(inputGate),
generateInputGateToChannelIndexOffsetMap(inputGate),
toNotifyOnCheckpoint);
registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
...
return new CheckpointedInputGate(inputGate, barrierHandler);
}
createCheckpointBarrierHandler方法
//根据不同语义创建不同的 CheckpointBarrierHandler
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
IntStream numberOfInputChannelsPerGate,
ChannelStateWriter channelStateWriter,
String taskName,
InputGate[] channelIndexToInputGate,
Map<InputGate, Integer> inputGateToChannelIndexOffset,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new CheckpointBarrierUnaligner(
numberOfInputChannelsPerGate.toArray(),
channelStateWriter,
taskName,
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(
taskName,
channelIndexToInputGate,
inputGateToChannelIndexOffset,
toNotifyOnCheckpoint);
case AT_LEAST_ONCE:
return new CheckpointBarrierTracker(numberOfInputChannelsPerGate.sum(), toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}
CheckpointBarrierAligner -- EXACTLY_ONCE 启用对齐使用逻辑
CheckpointBarrierAligner实现了Barrier对齐的逻辑,在processBarrier方法中
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
//如果输入通道仅有一个,无需barrier对齐,直接进行checkpoint操作
if (totalNumberOfInputChannels == 1) {
//恢复消费
resumeConsumption(channelIndex);
//如果 barrierid 大于当前checkpointid,意味着需要进行新的checkpoint
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
return;
}
// -- general code path for multiple input channels --
// todo numBarriersReceived == 接收屏障的数量(=阻塞/缓冲通道的数量) 重要:已取消的检查点必须始终为0屏障
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
//barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
if (barrierId == currentCheckpointId) {
// regular case
// 将这个channel 阻塞住,numBarriersReceived 加一
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,
// 又要开始处理新的checkpoint 这种情况需要终止当前正在处理的checkpoint
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
barrierId,
currentCheckpointId);
// let the task know we are not completing this
// 通知Stream终止一个检查点, 为currentCheckpointId的checkpoint
// 最终会调用StreamTask的abortCheckpointOnBarrier方法
notifyAbort(currentCheckpointId,
new CheckpointException(
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
// abort the current checkpoint
// 恢复被阻塞的channel,重置barrier计数 重置对齐操作开始时间
// todo 流对齐结束,将缓冲数据返回
releaseBlocksAndResetBarriers();
// begin a new checkpoint
// 启动新检查点的流对齐
beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
// 忽略来自早期检查点的尾随障碍(现在已过时), 恢复消费
resumeConsumption(channelIndex);
}
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
// 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
// 忽略更早的checkpoint barrie
resumeConsumption(channelIndex);
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
// 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
// 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
// todo 流对齐结束,将缓冲数据返回
releaseBlocksAndResetBarriers();
// 通知检查点
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
}
notifyAbort终止检查点方法
protected void notifyAbort(long checkpointId, CheckpointException cause) throws IOException {
//这里调用的 streamTask的 abortCheckpointOnBarrier方法
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
}
// StreamTask的 abortCheckpointOnBarrier方法
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}
流程图
CheckpointBarrierUnaligner-- EXACTLY_ONCE -- 未启用对齐
在该类中创建了一个内部类ThreadSafeUnaligner,实现了BufferReceivedListener和Closeable接口, ThreadSafeUnaligner用户封装在netty线程和任务线程之间共享的状态
private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable</pre>
CheckpointBarrierUnaligner的方法,最后会调用threadSafeUnaligner.notifyBarrierReceived方法
```java
@Override
public void processBarrier(
CheckpointBarrier receivedBarrier,
int channelIndex) throws Exception {
//获取当前barrier id
long barrierId = receivedBarrier.getId();
//如果当前使用的检查点Id 大于 barrier id 或者 == barrier id 并且 当前使用的barrier数等于0
//说明 该barrier是旧的或是取消的 barrier ,那么进行忽略掉
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) {
// ignore old and cancelled barriers
return;
}
//如果 barrier id > 当前使用的检查点Id
// 说明在检查点id为完成之前 接收到新开始的checkpoint --放弃当前执行的checkpoint,执行最新的checkpoint
if (currentConsumedCheckpointId < barrierId) {
//那么将当前使用的检查点id 设置为barrier id
currentConsumedCheckpointId = barrierId;
numBarrierConsumed = 0;
//将 布尔值 赋值给每个元素
Arrays.fill(hasInflightBuffers, true);
}
// 如果 barrier id = 当前使用的检查点Id --说明接收到当前checkpoint的barrier
if (currentConsumedCheckpointId == barrierId) {
//hasInflightBuffers的channelIndex索引位置设置为false -- 表示 该channel接收到barrier
hasInflightBuffers[channelIndex] = false;
// 当前使用的 barrier数 ++
numBarrierConsumed++;
}
// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
// buffer queues
// to avoid replicating any logic, we simply call notifyBarrierReceived here as well
//processBarrier是从任务线程调用的,并且可以在空缓冲区队列上的notifyBarrierReceived之前发生
// ,以避免复制任何逻辑,我们在这里也只调用notifyBarrierReceived
// todo 调用 notifyBarrierReceived 方法
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
}
threadSafeUnaligner对象在创建CheckpointBarrierUnaligner的时候初始化的
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this);
现在进入threadSafeUnaligner.notifyBarrierReceived方法,当接收到新的barrier id时候会调用handleNewCheckpoint方法
@Override
public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
//获取barrier id
long barrierId = barrier.getId();
//如果 barrier id 大于 当前接受到的checkpoint id , 说明当前检查点处理完之前,接受到新的barrier id
if (currentReceivedCheckpointId < barrierId) {
// 终止当前执行的checkpoint id, 启动当前barrier id的 checkpoint
handleNewCheckpoint(barrier);
// 通知检查点触发
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
}
//获取通道索引
int channelIndex = handler.getFlattenedChannelIndex(channelInfo);
//如果barrier id == 当前checkpoint id 并且 ,输入通道未接收屏障的状态为 true
//说明 task从通道中,接收到检查点的barrier
if (barrierId == currentReceivedCheckpointId && storeNewBuffers[channelIndex]) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelIndex, barrierId);
}
//将状态设置为flase - 说明收到barrier
storeNewBuffers[channelIndex] = false;
//当 barrier的channel数 等于 开启的channel 说明接受到了所有的barrier
if (++numBarriersReceived == numOpenChannels) {
//清空状态
allBarriersReceivedFuture.complete(null);
}
}
}
进入handleNewCheckpoint方法,该方法主要用于终止当前执行的checkpoint,开始新的checkpoint
private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
long barrierId = barrier.getId();
if (!allBarriersReceivedFuture.isDone()) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
handler.taskName,
barrierId,
currentReceivedCheckpointId);
// let the task know we are not completing this
//通知任务, 当前检查点未完成
long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() ->
// 告知 Stream终止一个检查点 -- 为当前执行的检查点
handler.notifyAbort(currentCheckpointId,
new CheckpointException( // 终止原因
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
"notifyAbort");
}
//当前检查点id = barrier id
currentReceivedCheckpointId = barrierId;
//将元素中的布尔值 设置为true
Arrays.fill(storeNewBuffers, true);
//barrier 输入通道数
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
//启动对给定检查点id的通道状态的写操作
//调用 ChannelStateWriterImpl的实现方法, 该类实现ChannelStateWriter接口
channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
}
CheckpointBarrierTracker -- AT_LEAST_ONCE
在CheckpointBarrierTracker 中 定义了一个 CheckpointBarrierCount 内部类,CheckpointBarrierCount 它记录了一个barrier到来的次数以及该checkpoint是否被终止
CheckpointBarrierTracker 中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。
CheckpointBarrierCount 内部类
private static final class CheckpointBarrierCount {
//检查点id
private final long checkpointId;
//barrier数量
private int barrierCount;
private boolean aborted;
CheckpointBarrierCount(long checkpointId) {
this.checkpointId = checkpointId;
this.barrierCount = 1;
}
public long checkpointId() {
return checkpointId;
}
public int incrementBarrierCount() {
return ++barrierCount;
}
//是否未失败的
public boolean isAborted() {
return aborted;
}
//标记失败
public boolean markAborted() {
boolean firstAbort = !this.aborted;
this.aborted = true;
return firstAbort;
}
@Override
public String toString() {
return isAborted() ?
String.format("checkpointID=%d - ABORTED", checkpointId) :
String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
}
}
CheckpointBarrierTracker 的processBarrier方法 ,这个方法定义了相关barrier的逻辑
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel trackers
// 如果输入channel只有一个,立即触发通知进行checkpoint
if (totalNumberOfInputChannels == 1) {
// 通知触发checkpoint
notifyCheckpoint(receivedBarrier, 0);
return;
}
// general path for multiple input channels
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
}
// find the checkpoint barrier in the queue of pending barriers
//定义 CheckpointBarrierCount对象
CheckpointBarrierCount barrierCount = null;
int pos = 0;
// 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
// 同时,
1 checkpoint Coordinator 向所有的source节点 触发checkpoints
![1589876675197.png](https://upload-images.jianshu.io/upload_images/20250277-95f75c1139a2a3b7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
2 source节点向下游广播barrier,这个 barrier就是 实现chandy-lamport 分布式快照算法的核心 , 下游的Task只有接收到所有input的barrier才会执行checkpoint
![1589876827804.png](https://upload-images.jianshu.io/upload_images/20250277-f95d58ad07a65aa9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
3 当task完成state备份后,会将备份数据的地址 (state handle) 通知给checkpoint coordinator
![1589876897269.png](https://upload-images.jianshu.io/upload_images/20250277-b5a7d4c819564b95.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
4 下游sink节点收到上游两个inupt的barrier之后,会执行本地快照,这里 展示了RocksDB incremental checkpoint的流程, 首先RocksDB会全量数据刷到磁盘上(红色大三角) , 然后Flink框架 会从中选择没有上传的文件进行持久化备份(紫色小三角)
![1589877665084.png](https://upload-images.jianshu.io/upload_images/20250277-3d9cc080ea73d5c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
5 同样,当sink节点完成自己checkpoint之后,会将state handle返回 通知给coordinator
![1589878581549.png](https://upload-images.jianshu.io/upload_images/20250277-4ef2eb56fdbbd8ea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
6 当checkpoint coordinator 收集所有task的 state handle, 就认为这一次的checkpoint全局完成了, 向持久化存储中在备份一个checkpoint meta文件
![1589878708700.png](https://upload-images.jianshu.io/upload_images/20250277-a94a8d2857863ca6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**Checkpoint 的 EXACTLY_ONCE 语义**
为了实现 exactly-once 语义, Flink通过 一个input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成后在进行处理, 对于at least oncey语义, 无需缓存收集到的数据, 会对后续直接进行处理, 所以 当导致restore时, 数据可能导致多次处理
![1589879876575.png](https://upload-images.jianshu.io/upload_images/20250277-29a8783f871474f9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。
## CheckpointBarrier
id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。
timestamp:记录checkpoint barrier产生的时间。ScheduledTrigger的run方法转入了系统当前时间为checkpoint的timestamp。
checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。
```java
public class CheckpointBarrier extends RuntimeEvent {
private final long id;
private final long timestamp;
private final CheckpointOptions checkpointOptions;
public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
this.id = id;
this.timestamp = timestamp;
this.checkpointOptions = checkNotNull(checkpointOptions);
}
什么时候Barrier向下游传播
OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
StreamTask.performCheckpoint 方法中调用subtaskCheckpointCoordinator.checkpointState方法,在该方法中调用operatorChain.broadcastEvent进行广播
//StreamTask.performCheckpoint方法中
subtaskCheckpointCoordinator.checkpointState(
checkpointMetaData,
checkpointOptions,
checkpointMetrics,
operatorChain,
this::isCanceled);
//subtaskCheckpointCoordinator.checkpointState方法中
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
unalignedCheckpointEnabled);
//OperatorChain.broadcastEvent方法
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(event, isPriorityEvent);
}
}
InputProcessorUtil类
这个类中有一个重要的方法 CheckpointBarrierHandler方法, 这个方法会根据用户指定的语义创建不同的barrier处理器
在CheckpointBarrierHandler方法中判断语义,如果是EXACTLY_ONCE语义,并且未启用对齐的检查点创建CheckpointBarrierUnaligner否则创建CheckpointBarrierAligner,如果AT_LEAST_ONCE使用CheckpointBarrierTracker,三个类都实现了CheckpointBarrierHandler接口,主要负责barrier到来时候的处理逻辑,主要目的是为了barrier对齐的问题
调用链
StreamTask.beforeInvoke
protected void beforeInvoke() throws Exception {
...
// task specific initialization
init();
...
}
OneInputStreamTask.init.
public void init() throws Exception {
...
if (numberOfInputs > 0) {
//这里创建
CheckpointedInputGate inputGate = createCheckpointedInputGate();
DataOutput<IN> output = createDataOutput();
StreamTaskInput<IN> input = createTaskInput(inputGate, output);
inputProcessor = new StreamOneInputProcessor<>(
input,
output,
operatorChain);
}
...
}
OneInputStreamTask.createCheckpointedInputGate
private CheckpointedInputGate createCheckpointedInputGate() {
IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
return InputProcessorUtil.createCheckpointedInputGate(
this,
configuration,
getChannelStateWriter(),
inputGate,
getEnvironment().getMetricGroup().getIOMetricGroup(),
getTaskNameWithSubtaskAndId());
}
InputProcessorUtil.createCheckpointedInputGate
public static CheckpointedInputGate createCheckpointedInputGate(
AbstractInvokable toNotifyOnCheckpoint,
StreamConfig config,
ChannelStateWriter channelStateWriter,
InputGate inputGate,
TaskIOMetricGroup taskIOMetricGroup,
String taskName) {
//这里创建
CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
config,
IntStream.of(inputGate.getNumberOfInputChannels()),
channelStateWriter,
taskName,
generateChannelIndexToInputGateMap(inputGate),
generateInputGateToChannelIndexOffsetMap(inputGate),
toNotifyOnCheckpoint);
registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
...
return new CheckpointedInputGate(inputGate, barrierHandler);
}
createCheckpointBarrierHandler方法
//根据不同语义创建不同的 CheckpointBarrierHandler
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
IntStream numberOfInputChannelsPerGate,
ChannelStateWriter channelStateWriter,
String taskName,
InputGate[] channelIndexToInputGate,
Map<InputGate, Integer> inputGateToChannelIndexOffset,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new CheckpointBarrierUnaligner(
numberOfInputChannelsPerGate.toArray(),
channelStateWriter,
taskName,
toNotifyOnCheckpoint);
}
return new CheckpointBarrierAligner(
taskName,
channelIndexToInputGate,
inputGateToChannelIndexOffset,
toNotifyOnCheckpoint);
case AT_LEAST_ONCE:
return new CheckpointBarrierTracker(numberOfInputChannelsPerGate.sum(), toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
}
}
CheckpointBarrierAligner -- EXACTLY_ONCE 启用对齐使用逻辑
CheckpointBarrierAligner实现了Barrier对齐的逻辑,在processBarrier方法中
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
//如果输入通道仅有一个,无需barrier对齐,直接进行checkpoint操作
if (totalNumberOfInputChannels == 1) {
//恢复消费
resumeConsumption(channelIndex);
//如果 barrierid 大于当前checkpointid,意味着需要进行新的checkpoint
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
return;
}
// -- general code path for multiple input channels --
// todo numBarriersReceived == 接收屏障的数量(=阻塞/缓冲通道的数量) 重要:已取消的检查点必须始终为0屏障
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
//barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
if (barrierId == currentCheckpointId) {
// regular case
// 将这个channel 阻塞住,numBarriersReceived 加一
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,
// 又要开始处理新的checkpoint 这种情况需要终止当前正在处理的checkpoint
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
barrierId,
currentCheckpointId);
// let the task know we are not completing this
// 通知Stream终止一个检查点, 为currentCheckpointId的checkpoint
// 最终会调用StreamTask的abortCheckpointOnBarrier方法
notifyAbort(currentCheckpointId,
new CheckpointException(
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
// abort the current checkpoint
// 恢复被阻塞的channel,重置barrier计数 重置对齐操作开始时间
// todo 流对齐结束,将缓冲数据返回
releaseBlocksAndResetBarriers();
// begin a new checkpoint
// 启动新检查点的流对齐
beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
// 忽略来自早期检查点的尾随障碍(现在已过时), 恢复消费
resumeConsumption(channelIndex);
}
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
// 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
// 忽略更早的checkpoint barrie
resumeConsumption(channelIndex);
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
// 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
// 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
// todo 流对齐结束,将缓冲数据返回
releaseBlocksAndResetBarriers();
// 通知检查点
notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
}
}
notifyAbort终止检查点方法
protected void notifyAbort(long checkpointId, CheckpointException cause) throws IOException {
//这里调用的 streamTask的 abortCheckpointOnBarrier方法
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
}
// StreamTask的 abortCheckpointOnBarrier方法
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}
流程图
CheckpointBarrierUnaligner-- EXACTLY_ONCE -- 未启用对齐
在该类中创建了一个内部类ThreadSafeUnaligner,实现了BufferReceivedListener和Closeable接口, ThreadSafeUnaligner用户封装在netty线程和任务线程之间共享的状态
private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable</pre>
CheckpointBarrierUnaligner的方法,最后会调用threadSafeUnaligner.notifyBarrierReceived方法
```java
@Override
public void processBarrier(
CheckpointBarrier receivedBarrier,
int channelIndex) throws Exception {
//获取当前barrier id
long barrierId = receivedBarrier.getId();
//如果当前使用的检查点Id 大于 barrier id 或者 == barrier id 并且 当前使用的barrier数等于0
//说明 该barrier是旧的或是取消的 barrier ,那么进行忽略掉
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) {
// ignore old and cancelled barriers
return;
}
//如果 barrier id > 当前使用的检查点Id
// 说明在检查点id为完成之前 接收到新开始的checkpoint --放弃当前执行的checkpoint,执行最新的checkpoint
if (currentConsumedCheckpointId < barrierId) {
//那么将当前使用的检查点id 设置为barrier id
currentConsumedCheckpointId = barrierId;
numBarrierConsumed = 0;
//将 布尔值 赋值给每个元素
Arrays.fill(hasInflightBuffers, true);
}
// 如果 barrier id = 当前使用的检查点Id --说明接收到当前checkpoint的barrier
if (currentConsumedCheckpointId == barrierId) {
//hasInflightBuffers的channelIndex索引位置设置为false -- 表示 该channel接收到barrier
hasInflightBuffers[channelIndex] = false;
// 当前使用的 barrier数 ++
numBarrierConsumed++;
}
// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
// buffer queues
// to avoid replicating any logic, we simply call notifyBarrierReceived here as well
//processBarrier是从任务线程调用的,并且可以在空缓冲区队列上的notifyBarrierReceived之前发生
// ,以避免复制任何逻辑,我们在这里也只调用notifyBarrierReceived
// todo 调用 notifyBarrierReceived 方法
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
}
threadSafeUnaligner对象在创建CheckpointBarrierUnaligner的时候初始化的
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this);
现在进入threadSafeUnaligner.notifyBarrierReceived方法,当接收到新的barrier id时候会调用handleNewCheckpoint方法
@Override
public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
//获取barrier id
long barrierId = barrier.getId();
//如果 barrier id 大于 当前接受到的checkpoint id , 说明当前检查点处理完之前,接受到新的barrier id
if (currentReceivedCheckpointId < barrierId) {
// 终止当前执行的checkpoint id, 启动当前barrier id的 checkpoint
handleNewCheckpoint(barrier);
// 通知检查点触发
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
}
//获取通道索引
int channelIndex = handler.getFlattenedChannelIndex(channelInfo);
//如果barrier id == 当前checkpoint id 并且 ,输入通道未接收屏障的状态为 true
//说明 task从通道中,接收到检查点的barrier
if (barrierId == currentReceivedCheckpointId && storeNewBuffers[channelIndex]) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelIndex, barrierId);
}
//将状态设置为flase - 说明收到barrier
storeNewBuffers[channelIndex] = false;
//当 barrier的channel数 等于 开启的channel 说明接受到了所有的barrier
if (++numBarriersReceived == numOpenChannels) {
//清空状态
allBarriersReceivedFuture.complete(null);
}
}
}
进入handleNewCheckpoint方法,该方法主要用于终止当前执行的checkpoint,开始新的checkpoint
private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
long barrierId = barrier.getId();
if (!allBarriersReceivedFuture.isDone()) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
handler.taskName,
barrierId,
currentReceivedCheckpointId);
// let the task know we are not completing this
//通知任务, 当前检查点未完成
long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() ->
// 告知 Stream终止一个检查点 -- 为当前执行的检查点
handler.notifyAbort(currentCheckpointId,
new CheckpointException( // 终止原因
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
"notifyAbort");
}
//当前检查点id = barrier id
currentReceivedCheckpointId = barrierId;
//将元素中的布尔值 设置为true
Arrays.fill(storeNewBuffers, true);
//barrier 输入通道数
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
//启动对给定检查点id的通道状态的写操作
//调用 ChannelStateWriterImpl的实现方法, 该类实现ChannelStateWriter接口
channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
}
CheckpointBarrierTracker -- AT_LEAST_ONCE
在CheckpointBarrierTracker 中 定义了一个 CheckpointBarrierCount 内部类,CheckpointBarrierCount 它记录了一个barrier到来的次数以及该checkpoint是否被终止
CheckpointBarrierTracker 中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。
CheckpointBarrierCount 内部类
private static final class CheckpointBarrierCount {
//检查点id
private final long checkpointId;
//barrier数量
private int barrierCount;
private boolean aborted;
CheckpointBarrierCount(long checkpointId) {
this.checkpointId = checkpointId;
this.barrierCount = 1;
}
public long checkpointId() {
return checkpointId;
}
public int incrementBarrierCount() {
return ++barrierCount;
}
//是否未失败的
public boolean isAborted() {
return aborted;
}
//标记失败
public boolean markAborted() {
boolean firstAbort = !this.aborted;
this.aborted = true;
return firstAbort;
}
@Override
public String toString() {
return isAborted() ?
String.format("checkpointID=%d - ABORTED", checkpointId) :
String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
}
}
CheckpointBarrierTracker 的processBarrier方法 ,这个方法定义了相关barrier的逻辑
@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel trackers
// 如果输入channel只有一个,立即触发通知进行checkpoint
if (totalNumberOfInputChannels == 1) {
// 通知触发checkpoint
notifyCheckpoint(receivedBarrier, 0);
return;
}
// general path for multiple input channels
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
}
// find the checkpoint barrier in the queue of pending barriers
//定义 CheckpointBarrierCount对象
CheckpointBarrierCount barrierCount = null;
int pos = 0;
// 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
// 同时记录下它在pendingCheckpoints中的位置到pos
// todo pendingCheckpoints 挂机的checkpoints -- 运行中的checkpoint,但是还未触发的checkpoint
for (CheckpointBarrierCount next : pendingCheckpoints) {
//找到 checkpointId == barrierId后 就退出,并记录找到的位置到pos变量
if (next.checkpointId == barrierId) {
// CheckpointBarrierCount进行赋值
barrierCount = next;
break;
}
pos++;
}
// 如果找到了id相同的CheckpointBarrierCount 不为null
if (barrierCount != null) {
// add one to the count to that barrier and check for completion
// 记录barrier +1
int numBarriersNew = barrierCount.incrementBarrierCount();
// 如果barrier数量和输入channel数量相等,说明已接收到所有input channel的barrier,可以进行checkpoint操作
if (numBarriersNew == totalNumberOfInputChannels) {
// checkpoint can be triggered (or is aborted and all barriers have been seen)
// first, remove this checkpoint and all all prior pending
// checkpoints (which are now subsumed)
// 检查点可以被触发(或者被终止,并且所有的barrier都被看到了)首先,删除这个检查点和所有之前的挂起检查点(现在被包含进来)
// 移除此barrier之前的所有未完成的checkpoint
for (int i = 0; i <= pos; i++) {
pendingCheckpoints.pollFirst(); //todo 方法检索并移除这个deque的第一个元素。如果此deque 队列为空返回null。
}
// notify the listener
// 如果checkpoint没有终止,通知进行checkpoint操作
if (!barrierCount.isAborted()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
// 通知触发checkpoint
notifyCheckpoint(receivedBarrier, 0);
}
}
}
else {
// first barrier for that checkpoint ID
// add it only if it is newer than the latest checkpoint.
// if it is not newer than the latest checkpoint ID, then there cannot be a
// successful checkpoint for that ID anyways
// todo 该检查点ID的第一个障碍只有当它比最新的检查点更新时才添加它。
// 如果它并不比最新的检查点ID新,那么该ID就不可能有一个成功的检查点
// 在pendingCheckpoints中没有找到id相同的checkpoint,说明这次到来的barrier对应新的checkpoint
// barrier ID是自增的,如果barrierId > latestPendingCheckpointID说明barrier比pending的所有checkpoint都要新
// 反之,说明barrier来迟了,直接忽略
if (barrierId > latestPendingCheckpointID) {
// 标记 checkpoint 开始
markCheckpointStart(receivedBarrier.getTimestamp());
// 更新latestPendingCheckpointID -- todo 到目前为止遇到的最高检查点ID
latestPendingCheckpointID = barrierId;
// 增加当前的checkpoint到pendingCheckpoints
pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
// make sure we do not track too many checkpoints
// 如果pendingCheckpoints中保存的数量大于MAX_CHECKPOINTS_TO_TRACK,删除最早未保存的checkpoint
// todo MAX_CHECKPOINTS_TO_TRACK = 追踪器追踪的是最多数量的检查点,其中一些,但不是所有的障碍都已经到达。
if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
pendingCheckpoints.pollFirst();
}
}
}
}
执行逻辑图 CheckpointBarrierHandler的processBarrier方法在CheckpointedInputGate的pollNext中调用
CheckpointedInputGate初始化在StreanTask的beforeInvoke方法中,调用init方法,该方法在OneInputStreamTask中
CheckpointedInputGate负责读取上游节点的数据以及对接收到barrier做出响应
@Override
public Optional<BufferOrEvent> pollNext() throws Exception {
while (true) {
//从inputGate读取数据
Optional<BufferOrEvent> next = inputGate.pollNext();
// 如果不存在值
if (!next.isPresent()) {
//返回一个空的buuffer
return handleEmptyBuffer();
}
//获取Optional保存 非空的值
BufferOrEvent bufferOrEvent = next.get();
checkState(!barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex())));
if (bufferOrEvent.isBuffer()) {
return next;
}
//如果事件是一个checkpoint barrier对象
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
//获取checkpointBarrier
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
// 调用barrierHandler的processBarrier方法
barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()));
}
//如果事件 是一个 checkpoint 取消标记
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
//调用barrierHandler的processCancellationBarrier方法
barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
}
else {
//如果事件 , 是分区结束事件
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
//调用调用barrierHandler的processEndOfPartition方法
barrierHandler.processEndOfPartition();
}
return next;
}
}
}
如有错误,欢迎指正!