flink exactly-once系列之两阶段提交实现分析

flink exactly-once系列目录:

 一、[两阶段提交概述](http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247483818&idx=1&sn=8c7bf4d00e81d7635bfa26b78a78ebba&chksm=fe2b65e5c95cecf349d9819fe6c998359cb9a8695abbdf74e326adc0503c8f330a2d47182b63&scene=21#wechat_redirect)

 二、两阶段提交实现分析

 三、StreamingFileSink源码分析

 四、事务性输出实现

 五、最终一致性实现

   在【[两阶段提交概述](http://mp.weixin.qq.com/s?__biz=MzU5MTc1NDUyOA==&mid=2247483818&idx=1&sn=8c7bf4d00e81d7635bfa26b78a78ebba&chksm=fe2b65e5c95cecf349d9819fe6c998359cb9a8695abbdf74e326adc0503c8f330a2d47182b63&scene=21#wechat_redirect)】中介绍了两阶段提交的基本思路以及如何根据checkpoint机制来实现两阶段提交思路,flink给出来两阶段提交抽象实现TwoPhaseCommitSinkFunction与具体实现FlinkKafkaProducer011。

一、TwoPhaseCommitSinkFunction

   TwoPhaseCommitSinkFunction是一个抽象类,继承RichSinkFunction,实现CheckpointedFunction与CheckpointListener接口。抽象出了以下四个方法:
  1. beginTransaction, 开启一个事务,获得一个句柄

  2. preCommit,执行预提交

  3. commit ,执行提交

  4. abort,放弃一个事务

使用这四个方法然后结合checkpoint 过程提供的hook,来实现两阶段提交过程,看下其具体调用流程:

a. initializeState 状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用;

b. snapshotState 与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;

c. notifyCheckpointComplete checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。

   在上面的流程中,任何一个步骤都有可能会失败,如果在预提交阶段失败,任务会失败重启回到最近一次的checkpoint成功状态,预提交的事务自然会因为事务超时而放弃;如果在预提交之后提交之前也就是完成checkpoint 但是还没触发notifyCheckpointComplete动作,这个这个过程中失败,那么就会从这次成功的checkpoint中恢复,会执行initializeState中的逻辑保证数据的一致性;如果在commit之后下次checkpoint之前失败,也就是在执行notifyCheckpointComplete之后失败,那么任务重启会继续提交之前已经提交过的事务,因此事务的提交需要保证重复提交不会影响数据的一致性。整个流程分析下来,除了需要保证事务重复提交保证数据的一致性外,还需要保证事务句柄能够被持久化容错,以便失败后重启恢复,接下来看下输出kafka 是如何保证数据一致性的。

二、FlinkKafkaProducer011

     kafka从0.11版本开始提供了幂等与事务的特性,保证了数据的一致性,具体可以参考https://www.infoq.cn/article/kafka-analysis-part-8这篇文章,幂等通过producerId与SequenceNumber 来保证,但是幂等只能保证对单个分区操作的数据一致性,事务通过transactionId、producerId、epoch三个元素来保证,transactionId由客户端指定,producerId内部实现但是对用户透明、epoch表示对相同transactionId 不同producer的区分。FlinkKafkaProducer011继承TwoPhaseCommitSinkFunction抽象类,将kafka事务机制与checkpoint结合,如下图:
image
   kafka的事务机制基本流程是先开启一个事务,然后发送数据,最后提交,将开启事务过程放在initializeState与snapshotState中,发送数据放在invoke中,flush 将所有缓存数据刷新到kafka ,相当于预提交操作,在snapshotState中执行,commitTransaction 提交操作放在notifyCheckpointComplete中执行。上面任何一个流程都有可能出现异常导致任务失败,对于kafka事务提交机制也是使用两阶段提交的模式,根据上一篇的分析,那么可能出现的问题就是在第二阶段,可能会出现部分提交成功部分提交失败导致数据不一致,如果能获取之前提交失败kafka 的transactionId、producerId、epoch这三个元素那么就可以在任务重启继续提交之前失败的事务,在flink 正好可以使用状态将这个三个元素进行容错,使重启之后可恢复。 在FlinkKafkaProducer011中使用KafkaTransactionState对象作为事务的句柄,保存着transactionId、producerId、epoch容错元素与FlinkKafkaProducer对象,FlinkKafkaProducer是transient类型的,不需要进行持久化,通过t-p-e就可以确定一个FlinkKafkaProducer。

理解以上流程就很好理解代码实现了,下面看几个重要的方法:

1. 开始事务,获得一个新的事务句柄

image.gif

2. 预提交,执行flush操作

image.gif

3. 提交,执行commitTransaction操作

image.gif

4. 出现异常,任务重启放弃事务

image.gif

三、两阶段提交实现总结

  1.  外部存储需要满足事务特性

  2.  外部存储需提供事务句柄,可持久化、可重新提交

  3. 由于这种两阶段提交模式与checkpoint绑定在一起,checkpoint是周期性的执行,那么checkpoint周期的长短则会影响下游数据的延时性,需要根据实际使用情况来调整。
企业微信截图_552a5b2d-e64f-41e2-be6b-865f5dc7a6dd.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • 前言 简书快正式从小黑屋里出来了,所以是时候重启更新了。这段时间积攒了不少要写的东西,逐个击破吧。 两阶段提交(t...
    LittleMagic阅读 8,581评论 7 21
  • 随着大型网站的各种高并发访问、海量数据处理等场景越来越多,如何实现网站的高可用、易伸缩、可扩展、安全等目标就显得越...
    阿斯蒂芬2阅读 529评论 0 0
  • 享受孤独给你带来的快乐,你有没有一个人去吃过火锅,下雨的时候去看场电影,过后却发现没有人给你撑伞,只是你习惯罢了。...
    话里人_bc31阅读 531评论 0 1
  • 很久很久很久已经不记得有多久没有这么坐着喝过茶了。我不是会品茶的人,再好的茶到我这都是像喝普通的水,说不出个所以然...
    鱼戏荷欢阅读 178评论 0 1
  • 时间飞逝,一天又即将告幕。 当空的月亮较几天前已经圆了许多,因为今天是农历十五了。即使知道月球绕着地球转,就会有月...
    小正月的大树阅读 305评论 0 0