介绍
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint
)的特性,在出现故障时将系统重置回正确状态
。下面通过简单的类比来解释检查点的作用。
案例
假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时,怎么办呢? 如果项链上有很多珠子,你显然不想从头再数一遍,尤其是当三人的速度不一样却又试图合作的时候,更是如此(比如想记录前一分钟三人一共数了多少颗珠子,回想一下一分钟滚动窗口)。
于是,你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋,将珠子分隔开; 当珠子被拨动的时候,皮筋也可以被拨动; 然后,你安排一个助手,让他在你和朋友拨到皮筋时记录总数。用这种方法,当有人数错时,就不必从头开始数。相反,你向其他人发出错误警示,然后你们都从上一根皮筋处开始重数,助手则会告诉每个人重数时的起始数值,例如在粉色皮筋处的数值是多少。
Flink检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言,珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次,助手则会保存与每根皮筋对应的检查点状态,如当遇到粉色皮筋时一共数了多少珠子,当遇到橙色皮筋时又是多少。当问题出现时,这种方法使得重新计数变得简单。
Flink的检查点算法
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性.
快照 的实现算法:
简单算法
--暂停应用, 然后开始做检查点, 再重新恢复应用
效率低,需要暂停应用Flink的改进Checkpoint算法. Flink的checkpoint机制原理来自"Chandy-Lamport algorithm"算法(分布式快照算)的一种变体:
异步 barrier 快照
(asynchronous barrier snapshotting)
. 每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator
,CheckpointCoordinator全权负责本应用的快照制作。
理解Barrier
流的barrier是Flink的Checkpoint中的一个核心概念. 多个barrier被插入到数据流中, 然后作为数据流的一部分随着数据流动(有点类似于Watermark).这些barrier不会跨越流中的数据.
每个barrier会把数据流分成两部分: 一部分数据进入当前的快照
, 另一部分数据进入下一个快照
. 每个barrier携带着快照的id. barrier 不会暂停数据
的流动, 所以非常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个barrier, 这个意味着可以并发的出现不同的快照.
Flink的检查点制作过程
- 第一步:
Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后Source Task会在数据流中安插CheckPoint barrier
Job Manager 对每一个job都会产生一个Checkpoint Coordinator
向所有 source 节点 触发 trigger Checkpoint
节点, 并行度是几,就会触发多少个。
source 会向流中触发Barrier
,接收到Barrier
的节点就会保存快照(包括source)。
- 第二步:
source 节点向下游广播barrier
,这个 barrier 就是实现Chandy-Lamport 分布式快照算法
的核心,下游的 task 只有收到所有进来的 barrier 才会执行相应的 Checkpoint(barrier对齐, 但是新版本有一种新的: barrier)
表示两秒钟 source 向流中触发一次Barrier
env.enableCheckpointing(2000);
source先收到barrier
,然后往后传递,若是多并行度,相当于多组接力赛跑比赛,所以顺序是不一致的,并不是同步。
- 第三步:
source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有进来的 barrier 才会执行相应的 Checkpoint(barrier对齐, 但是新版本有一种新的: barrier)
- 第四步:
下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。
-
第五步:
同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
第六步:
最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
严格一次语义: barrier对齐
在多并行度下, 如果要实现严格一次
, 则要执行barrier对齐
.
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐(barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
至少一次语义: barrier不对齐
会重复消费
, 就是至少一次语义.