原文地址:https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets
一、关于Checkpoints
Checkpoints是Flink从故障中恢复的一种内部机制。
Checkpoints是Flink应用程序状态的一致性副本,包括输入的读取位置点。
如果出现故障,Flink通过从检查点加载应用程序状态并从恢复的读取位置点继续执行,就好像什么都没有发生一样,从而恢复应用程序。
Flink的Checkpoints是基于Chandy-Lamport算法的分布式一致性快照
Checkpoints使Flink具有容错性,并确保流式应用程序的语义在发生故障时得到保留。检查点按应用程序可以配置的定期触发。
Flink中的Kafka Consumers与Flink的Checkpoints机制集成为一个有状态operator,其状态是所有Kafka分区中的读取偏移量 offsets。
当一个检查点被触发时,每个分区的偏移量都存储在检查点中。
Flink的检查点机制确保所有操作任务的存储状态是一致的,即它们基于相同的输入数据。当所有操作任务成功存储其状态时,检查点即完成。
因此,当从潜在的系统故障中重新启动恢复时,系统提供exactly-once状态更新保证。
二、Flink 中的 Kafka Consumers offsets是如何做检查点的
栗子
(数据被存在了 Flink 的 JobMaster 中,在 POC 或生产用例下,这些数据最好是能存到一个外部文件系统(如HDFS或S3)中)
Step 1:
如下所示,从一个 Kafka topic读取,有两个partition,每个partition都含有 “A”, “B”, “C”, ”D”, “E” 5条消息。
我们将两个partition的offset都设置为0.
Step 2:
Kafka comsumer开始从 partition 0 读取消息。
消息“A”正在被处理,
第一个 consumer 的 offset 变成了1。
Step 3:
消息“A”到达了 Flink Map Task。
两个 consumer 都开始读取下一条消息(partition 0 读取“B”,partition 1 读取“A”)。
各自将 offset 更新成 2 和 1 。
同时,Flink 的 JobMaster 开始在 source 触发了一个检查点。
Step 4:
接下来,由于 source 触发了检查点,Kafka consumer tasks创建了它们状态的第一个快照(”offset = 2, 1”),并将快照存到了 Flink 的 JobMaster 中。
Source 在消息“B”和“A”从partition 0 和 1 发出之后,发出一个 checkpoint barrier。
Checkopint barrier 用于对齐所有 operator task 的检查点,保证了整个检查点的一致性。
消息“A”到达了 Flink Map Task,而上面的 consumer 继续读取下一条消息(消息“C”)。
Step 5:
Flink Map Task 从source和检查点接收 checkpoint barrier 后,并将其状态发送给 JobMaster。
同时,consumer 会继续从 Kafka 读取更多的事件。
Step 6:
Flink Map Task 完成了它自己状态的快照流程后,就会和Flink JobMaster进行通信, 汇报它已经完成了这个 checkpoint。
当所有的 task 都确认其状态 checkpoint 后,JobMaster 就会将这个 checkpoint 标记为成功。
从此刻开始,这个 checkpoint 就可以用于故障恢复了。
值得一提的是,Flink 并不依赖 Kafka offset 从系统故障中恢复。
三、故障恢复
在发生故障时(比如,某个 worker 挂了),所有的 operator task 会被重启,而他们的状态会被重置到最近一次完成的checkpoint。
Kafka source 分别从 offset 2 和 1 重新开始读取消息(因为这是完成的 checkpoint 中存的 offset)。
当作业重启后,我们可以期待正常的系统操作,就好像之前没有发生故障一样。如下图所示:
官网中关于exactly-once的解释:
With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions):
启用flink的检查点之外,可以通过将适当的语义参数传递给flinkkafkaproducer011,有三种不同的操作模式:
- Semantic.NONE:
- Semantic.AT_LEAST_ONCE (default setting)
- Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
使用Kafka事务提供一次语义。无论何时使用事务写入Kafka,都不要忘记为任何使用Kafka记录的应用程序设置所需的isolation.level(read_committed或read_uncommitted-后者是默认值)。
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times.
Semantic.EXACTLY_ONCE模式依赖于在从所述检查点恢复后,提交在接受检查点之前启动的事务的能力。如果Flink崩溃和完成重新启动之间的时间大于Kafka的事务超时,则将丢失数据(Kafka将自动中止超过超时时间的事务)。考虑到这一点,根据预期的停机时间适当配置事务超时。
Kafka Consumers and Fault Tolerance
With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
启用flink的检查点后,flink-kafka Consumers将使用某个topic的记录,并以一致的方式定期检查其所有kafka偏移量以及其他操作的状态。
在作业失败的情况下,Flink将把流媒体程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。
因此,绘制检查点的间隔定义了程序在失败时最多需要返回多少。
要使用容错kafka Consumers,需要在执行环境中启用拓扑检查点:val env = >StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 >msecs
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
还要注意,只有在有足够的processing slots来重新启动topology时,Flink才能重新启动topology。因此,如果topology由于TaskManager丢失而失败,那么之后必须有足够的可用slots。 Flink on YARN支持自动重启丢失的YARN。
如果未启用检查点,Kafka使用者将定期将偏移提交给Zookeeper。// checkpoint常用设置参数 env.enableCheckpointing(4000) env.getCheckpointConfig.setCheckpointingMode(Checkpoin>tingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointTimeout(10000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)