Flink中的Kafka Consumers与Flink的Checkpoints机制集成为一个有状态operator,其状态是所有Kafka分区中的读取偏移量 offsets。
二、Flink 中的 Kafka Consumers offsets是如何做检查点的
(数据被存在了 Flink 的 JobMaster 中,在 POC 或生产用例下,这些数据最好是能存到一个外部文件系统(如HDFS或S3)中)
Step 1:
如下所示,从一个 Kafka topic读取,有两个partition,每个partition都含有 “A”, “B”, “C”, ”D”, “E” 5条消息。
Step 2:
Kafka comsumer开始从 partition 0 读取消息。
第一个 consumer 的 offset 变成了1。
Step 3:
消息“A”到达了 Flink Map Task。
两个 consumer 都开始读取下一条消息(partition 0 读取“B”,partition 1 读取“A”)。
各自将 offset 更新成 2 和 1 。
同时,Flink 的 JobMaster 开始在 source 触发了一个检查点。
Step 4:
接下来,由于 source 触发了检查点,Kafka consumer tasks创建了它们状态的第一个快照(”offset = 2, 1”),并将快照存到了 Flink 的 JobMaster 中。
Source 在消息“B”和“A”从partition 0 和 1 发出之后,发出一个 checkpoint barrier。
Checkopint barrier 用于对齐所有 operator task 的检查点,保证了整个检查点的一致性。
消息“A”到达了 Flink Map Task,而上面的 consumer 继续读取下一条消息(消息“C”)。
Step 5:
Flink Map Task 从source和检查点接收 checkpoint barrier 后,并将其状态发送给 JobMaster。
同时,consumer 会继续从 Kafka 读取更多的事件。
Step 6:
Flink Map Task 完成了它自己状态的快照流程后,就会和Flink JobMaster进行通信, 汇报它已经完成了这个 checkpoint。
当所有的 task 都确认其状态 checkpoint 后,JobMaster 就会将这个 checkpoint 标记为成功。
从此刻开始,这个 checkpoint 就可以用于故障恢复了。
值得一提的是,Flink 并不依赖 Kafka offset 从系统故障中恢复。
在发生故障时(比如,某个 worker 挂了),所有的 operator task 会被重启,而他们的状态会被重置到最近一次完成的checkpoint。
Kafka source 分别从 offset 2 和 1 重新开始读取消息(因为这是完成的 checkpoint 中存的 offset)。
With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions):
- Semantic.NONE:
- Semantic.AT_LEAST_ONCE (default setting)
- Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times.
Kafka Consumers and Fault Tolerance
With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
启用flink的检查点后,flink-kafka Consumers将使用某个topic的记录,并以一致的方式定期检查其所有kafka偏移量以及其他操作的状态。
要使用容错kafka Consumers,需要在执行环境中启用拓扑检查点:val env = >StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 >msecs
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
还要注意,只有在有足够的processing slots来重新启动topology时,Flink才能重新启动topology。因此,如果topology由于TaskManager丢失而失败,那么之后必须有足够的可用slots。 Flink on YARN支持自动重启丢失的YARN。
如果未启用检查点,Kafka使用者将定期将偏移提交给Zookeeper。// checkpoint常用设置参数 env.enableCheckpointing(4000) env.getCheckpointConfig.setCheckpointingMode(Checkpoin>tingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointTimeout(10000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)