Flink#CheckpointFunction接口实现

1.CheckpointFunction

Flink 中持久化的动作就是checkpoint ,其在Flink中的所占的分量不言而喻,当我们使用Flink的一些自定义逻辑接口的时候如果在实现逻辑的同时还能实现其 CheckpointFunction接口逻辑,无疑是我们的自定义实现更加趋于完美,同时也有效的体现了Flink 的state 计算的强大能力。

public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

#snapshotState方法:当每次任务触发checkpoint时执行,更新保存状态数据
#initializeState方法:初始化checkpoint 存储结构,一般在这里我们会实现两个逻辑:

  • 1.判断checkpoint 是否是重启状态恢复,并实现状态恢复逻辑
  • 2.初始化checkpoint存储逻辑规则。

FlinkKafkaConsumerBase 源码分析解读

下面将该实现加上相应的注释,以方便大家对这块代码的理解

      /** Data for pending but uncommitted offsets.  新的状态快照暂时存储的集合 */
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    /** Accessor for state in the operator state backend.*/
     // 状态存储的规则集合。(简单来说也就是定义了:FlinkKafkaConsumer存储的格式是什么 )
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
      /** State name of the consumer's partition offset states. */
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

@Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {
          //获取计算状态存储对象
        OperatorStateStore stateStore = context.getOperatorStateStore();

        //通过默认的状态存储名称,获取对应的存储状态集合(如果是初次启动,则一定为空,此操作是为了判断是否为状态恢复操作)
        ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
              stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);



//***************************************************************************************************************************************
// 在不考虑状态恢复的情况下,其实本步操作已经完成了 状态存储的初始化的所有逻辑 
//***************************************************************************************************************************************
       //为状态存储集合初始化 
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
                OFFSETS_STATE_NAME, // 状态存储字符串名称
                 /*类型状态信息(该写法是固定写法,是由TypeHint 源码示例提供 返回值:*/
                        TypeInformation<Tuple2<KafkaTopicPartition, Long>>)
                TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
          
//****************************************************************************************************************************************
//****************************************************************************************************************************************



//****************************************************************************************************************************************
//  判断是否为状态恢复的操作:若是状态恢复的操作-->读取旧状态集合#oldRoundRobinListState 加载到#unionOffsetStates 中保存,并清空#oldRoundRobinListState 
//****************************************************************************************************************************************
            
              // 判断 是否为状态恢复的操作
        if (context.isRestored() && !restoredFromOldState) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // 读取旧状态集合加载到 状态存储集合中
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                restoredFromOldState = true;
                unionOffsetStates.add(kafkaOffset);
            }
           //清空旧状态集合
            oldRoundRobinListState.clear();
            //判断分区发现功能是否可用
            if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                throw new IllegalArgumentException(
                    "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }

            //此步操作是将restoredState存储一份保存起来,用于分区发现功能时操作(可阅读restoredState参数的原英文说明)
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
        } else {
           // 不是状态恢复操作 给出日志提示
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
//****************************************************************************************************************************************
//****************************************************************************************************************************************

    }

    @Override   // 触发快照
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
                //判断消费者是否还在运行
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
                    // 清空状态存储数据
            unionOffsetStates.clear();
                     // kafkaFetcher kafka 访问对象,该对象第一次初始化在 FlinkKafkaConsumerBase#run 方法中,每次的数据交互都会更新
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
  //*******************************************************************************************************************************
  // 此处不在详细赘述:kafka 访问器为null,则说明数据run 还未执行,则以订阅分区信息作为状态信息存储
  //*******************************************************************************************************************************
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************




  //*******************************************************************************************************************************
  //  获取 fetcher 访问器中的状态信息到#currentOffsets 中,并将其作为最新的状态信息保存
  //*******************************************************************************************************************************
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************



  //******************************************************************************************************************************* 
  // 判断 最大checkpoint 长度,将超出的记录移除 保证记录的最大长度使用不超过用户的配置长度
  //*******************************************************************************************************************************
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************
        }
    }

本文在注释中详细的解读了 FlinkKafkaConsumerBase 中CheckpointFunction的具体实现方式,以及每一步实现的意义。后期会进一步向大家展示具体的应用案例 。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容