1.CheckpointFunction
Flink 中持久化的动作就是checkpoint ,其在Flink中的所占的分量不言而喻,当我们使用Flink的一些自定义逻辑接口的时候如果在实现逻辑的同时还能实现其 CheckpointFunction接口逻辑,无疑是我们的自定义实现更加趋于完美,同时也有效的体现了Flink 的state 计算的强大能力。
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
#snapshotState方法:当每次任务触发checkpoint时执行,更新保存状态数据
#initializeState方法:初始化checkpoint 存储结构,一般在这里我们会实现两个逻辑:
- 1.判断checkpoint 是否是重启状态恢复,并实现状态恢复逻辑
- 2.初始化checkpoint存储逻辑规则。
FlinkKafkaConsumerBase 源码分析解读
下面将该实现加上相应的注释,以方便大家对这块代码的理解
/** Data for pending but uncommitted offsets. 新的状态快照暂时存储的集合 */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
/** Accessor for state in the operator state backend.*/
// 状态存储的规则集合。(简单来说也就是定义了:FlinkKafkaConsumer存储的格式是什么 )
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {
//获取计算状态存储对象
OperatorStateStore stateStore = context.getOperatorStateStore();
//通过默认的状态存储名称,获取对应的存储状态集合(如果是初次启动,则一定为空,此操作是为了判断是否为状态恢复操作)
ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
//***************************************************************************************************************************************
// 在不考虑状态恢复的情况下,其实本步操作已经完成了 状态存储的初始化的所有逻辑
//***************************************************************************************************************************************
//为状态存储集合初始化
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME, // 状态存储字符串名称
/*类型状态信息(该写法是固定写法,是由TypeHint 源码示例提供 返回值:*/
TypeInformation<Tuple2<KafkaTopicPartition, Long>>)
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
//****************************************************************************************************************************************
//****************************************************************************************************************************************
//****************************************************************************************************************************************
// 判断是否为状态恢复的操作:若是状态恢复的操作-->读取旧状态集合#oldRoundRobinListState 加载到#unionOffsetStates 中保存,并清空#oldRoundRobinListState
//****************************************************************************************************************************************
// 判断 是否为状态恢复的操作
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 读取旧状态集合加载到 状态存储集合中
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
//清空旧状态集合
oldRoundRobinListState.clear();
//判断分区发现功能是否可用
if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}
//此步操作是将restoredState存储一份保存起来,用于分区发现功能时操作(可阅读restoredState参数的原英文说明)
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
} else {
// 不是状态恢复操作 给出日志提示
LOG.info("No restore state for FlinkKafkaConsumer.");
}
//****************************************************************************************************************************************
//****************************************************************************************************************************************
}
@Override // 触发快照
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
//判断消费者是否还在运行
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
// 清空状态存储数据
unionOffsetStates.clear();
// kafkaFetcher kafka 访问对象,该对象第一次初始化在 FlinkKafkaConsumerBase#run 方法中,每次的数据交互都会更新
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
//*******************************************************************************************************************************
// 此处不在详细赘述:kafka 访问器为null,则说明数据run 还未执行,则以订阅分区信息作为状态信息存储
//*******************************************************************************************************************************
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
//*******************************************************************************************************************************
// 获取 fetcher 访问器中的状态信息到#currentOffsets 中,并将其作为最新的状态信息保存
//*******************************************************************************************************************************
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
//*******************************************************************************************************************************
// 判断 最大checkpoint 长度,将超出的记录移除 保证记录的最大长度使用不超过用户的配置长度
//*******************************************************************************************************************************
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
}
}