关于Flink的exactly-once语义

原文地址:https://www.ververica.com/blog/how-apache-flink-manages-kafka-consumer-offsets

一、关于Checkpoints

Checkpoints是Flink从故障中恢复的一种内部机制。
Checkpoints是Flink应用程序状态的一致性副本,包括输入的读取位置点。
如果出现故障,Flink通过从检查点加载应用程序状态并从恢复的读取位置点继续执行,就好像什么都没有发生一样,从而恢复应用程序。
Flink的Checkpoints是基于Chandy-Lamport算法的分布式一致性快照

Checkpoints使Flink具有容错性,并确保流式应用程序的语义在发生故障时得到保留。检查点按应用程序可以配置的定期触发。

Flink中的Kafka Consumers与Flink的Checkpoints机制集成为一个有状态operator,其状态是所有Kafka分区中的读取偏移量 offsets。
当一个检查点被触发时,每个分区的偏移量都存储在检查点中。
Flink的检查点机制确保所有操作任务的存储状态是一致的,即它们基于相同的输入数据。当所有操作任务成功存储其状态时,检查点即完成。
因此,当从潜在的系统故障中重新启动恢复时,系统提供exactly-once状态更新保证。

二、Flink 中的 Kafka Consumers offsets是如何做检查点的

栗子
(数据被存在了 Flink 的 JobMaster 中,在 POC 或生产用例下,这些数据最好是能存到一个外部文件系统(如HDFS或S3)中)

Step 1:

如下所示,从一个 Kafka topic读取,有两个partition,每个partition都含有 “A”, “B”, “C”, ”D”, “E” 5条消息。
我们将两个partition的offset都设置为0.


step1.png
Step 2:

Kafka comsumer开始从 partition 0 读取消息。
消息“A”正在被处理,
第一个 consumer 的 offset 变成了1。


step2.png
Step 3:

消息“A”到达了 Flink Map Task。
两个 consumer 都开始读取下一条消息(partition 0 读取“B”,partition 1 读取“A”)。
各自将 offset 更新成 2 和 1 。
同时,Flink 的 JobMaster 开始在 source 触发了一个检查点。


step 3.png
Step 4:

接下来,由于 source 触发了检查点,Kafka consumer tasks创建了它们状态的第一个快照(”offset = 2, 1”),并将快照存到了 Flink 的 JobMaster 中。
Source 在消息“B”和“A”从partition 0 和 1 发出之后,发出一个 checkpoint barrier。
Checkopint barrier 用于对齐所有 operator task 的检查点,保证了整个检查点的一致性。
消息“A”到达了 Flink Map Task,而上面的 consumer 继续读取下一条消息(消息“C”)。


step4.png
Step 5:

Flink Map Task 从source和检查点接收 checkpoint barrier 后,并将其状态发送给 JobMaster。
同时,consumer 会继续从 Kafka 读取更多的事件。

step5.png
Step 6:

Flink Map Task 完成了它自己状态的快照流程后,就会和Flink JobMaster进行通信, 汇报它已经完成了这个 checkpoint。
当所有的 task 都确认其状态 checkpoint 后,JobMaster 就会将这个 checkpoint 标记为成功。

从此刻开始,这个 checkpoint 就可以用于故障恢复了。
值得一提的是,Flink 并不依赖 Kafka offset 从系统故障中恢复。

step6.png

三、故障恢复

在发生故障时(比如,某个 worker 挂了),所有的 operator task 会被重启,而他们的状态会被重置到最近一次完成的checkpoint。
Kafka source 分别从 offset 2 和 1 重新开始读取消息(因为这是完成的 checkpoint 中存的 offset)。
当作业重启后,我们可以期待正常的系统操作,就好像之前没有发生故障一样。如下图所示:


故障恢复.png

官网中关于exactly-once的解释:

With Flink’s checkpointing enabled, the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions) can provide exactly-once delivery guarantees.
Besides enabling Flink’s checkpointing, you can also choose three different modes of operating chosen by passing appropriate semantic parameter to the FlinkKafkaProducer011 (FlinkKafkaProducer for Kafka >= 1.0.0 versions):
启用flink的检查点之外,可以通过将适当的语义参数传递给flinkkafkaproducer011,有三种不同的操作模式:

  • Semantic.NONE:
  • Semantic.AT_LEAST_ONCE (default setting)
  • Semantic.EXACTLY_ONCE: uses Kafka transactions to provide exactly-once semantic. Whenever you write to Kafka using transactions, do not forget about setting desired isolation.level (read_committed or read_uncommitted - the latter one is the default value) for any application consuming records from Kafka.
    使用Kafka事务提供一次语义。无论何时使用事务写入Kafka,都不要忘记为任何使用Kafka记录的应用程序设置所需的isolation.level(read_committed或read_uncommitted-后者是默认值)。

Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions that were started before taking a checkpoint, after recovering from the said checkpoint. If the time between Flink application crash and completed restart is larger than Kafka’s transaction timeout there will be data loss (Kafka will automatically abort transactions that exceeded timeout time). Having this in mind, please configure your transaction timeout appropriately to your expected down times.
Semantic.EXACTLY_ONCE模式依赖于在从所述检查点恢复后,提交在接受检查点之前启动的事务的能力。如果Flink崩溃和完成重新启动之间的时间大于Kafka的事务超时,则将丢失数据(Kafka将自动中止超过超时时间的事务)。考虑到这一点,根据预期的停机时间适当配置事务超时。

Kafka Consumers and Fault Tolerance

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume records from a topic and periodically checkpoint all its Kafka offsets, together with the state of other operations, in a consistent manner. In case of a job failure, Flink will restore the streaming program to the state of the latest checkpoint and re-consume the records from Kafka, starting from the offsets that were stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
To use fault tolerant Kafka Consumers, checkpointing of the topology needs to be enabled at the execution environment:
启用flink的检查点后,flink-kafka Consumers将使用某个topic的记录,并以一致的方式定期检查其所有kafka偏移量以及其他操作的状态。
在作业失败的情况下,Flink将把流媒体程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。
因此,绘制检查点的间隔定义了程序在失败时最多需要返回多少。
要使用容错kafka Consumers,需要在执行环境中启用拓扑检查点:

val env = >StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 >msecs

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
还要注意,只有在有足够的processing slots来重新启动topology时,Flink才能重新启动topology。因此,如果topology由于TaskManager丢失而失败,那么之后必须有足够的可用slots。 Flink on YARN支持自动重启丢失的YARN。
如果未启用检查点,Kafka使用者将定期将偏移提交给Zookeeper。

// checkpoint常用设置参数
env.enableCheckpointing(4000)
env.getCheckpointConfig.setCheckpointingMode(Checkpoin>tingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(10000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342