[TOC]
Exactly-once语义
Flink自1.4.0开始实现exactly-once的数据保证,即在任何情况下都能保证数据对应用产生的效果只有一次,不会多也不会少。具体实现依赖于抽象类TwoPhaseCommitSinkFunction,用户只需要实现类接口就可以自己定义对外部系统的exactly-once。
Flink的checkpoint可以保证作业失败的情况下从最近一次快照进行恢复,也就是可以保证系统内部的exactly-once。但是,flink有很多外接系统,比如将数据写到kafka,一旦作业失败重启,offset重置,会消费旧数据,从而将重复的结果写到kafka。如下图:
这个时候,仅靠系统本身是无法保证exactly-once的。系统之间的数据一致性一般要靠2PC协议来保证,flink的TwoPhaseCommitSinkFunction也是基于此实现的。exactly-once的语义如下图:
Exactly-once VS At-least-once
算子做快照时,如果等所有输入端的barrier都到了才开始做快照,那么就可以保证算子的exactly-once;如果为了降低延时而跳过对其,从而继续处理数据,那么等barrier都到齐后做快照就是at-least-once了,因为这次的快照掺杂了下一次快照的数据,当作业失败恢复的时候,这些数据会重复作用系统,就好像这些数据被消费了两遍。
注:对齐只会发生在算子的上端是join操作以及上游存在partition或者shuffle的情况,对于直连操作类似map、flatMap、filter等还是会保证exactly-once的语义。
端到端的Exactly once实现
下面以一个简单的flink读写kafka作为例子来说明(kafka0.11版本开始支持exactly-once语义)。如图所示:
在我们今天要讨论的 Flink 应用程序示例中,我们有:
- 从 Kafka 读取数据的数据源(在 Flink 为 KafkaConsumer)
- 窗口聚合
- 将数据写回 Kafka 的数据接收器(在 Flink 为 KafkaProducer)
要使数据接收器提供 Exactly-Once 语义保证,必须在一个事务中将所有数据写入 Kafka。提交捆绑了两个检查点之间的所有写入数据。这可确保在发生故障时能回滚所有写入的数据。
但是,在具有多个并发运行的接收器任务的分布式系统中,简单的提交或回滚是远远不够的,因为必须确保所有组件在提交或回滚时一致才能确保一致的结果。Flink 使用两阶段提交协议及预提交阶段来解决这一问题。
检查点的启动表示我们的两阶段提交协议的预提交阶段。当检查点启动时,Flink JobManager 会将检查点 Barrier 注入数据流中(将数据流中的记录分为进入当前检查点的集合与进入下一个检查点的集合)。
Barrier 在算子之间传递。对于每个算子,它会触发算子状态后端生成状态的快照。
数据源存储 Kafka 的偏移量,完成此操作后将检查点 Barrier 传递给下一个算子。
这种方法只适用于算子只有内部状态(Internal state)的情况。内部状态是 Flink 状态可以存储和管理的所有内容 - 例如,第二个算子中的窗口总和。当一个进程只有内部状态时,除了写入到已定义的状态变量之外,不需要在预提交阶段执行任何其他操作。Flink 负责在检查点成功的情况下正确提交这些写入,或者在出现故障时中止这些写入。
但是,当一个进程具有外部状态(External state)时,状态处理会有所不同。外部状态通常以写入外部系统(如Kafka)的形式出现。在这种情况下,为了提供 Exactly-Once 语义保证,外部系统必须支持事务,这样才能和两阶段提交协议集成。
我们示例中的数据接收器具有外部状态,因为它正在向 Kafka 写入数据。在这种情况下,在预提交阶段,除了将其状态写入状态后端之外,数据接收器还必须预先提交其外部事务。
当检查点 Barrier 通过所有算子并且触发的快照回调成功完成时,预提交阶段结束。所有触发的状态快照都被视为该检查点的一部分。检查点是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。
下一步是通知所有算子检查点已成功完成。这是两阶段提交协议的提交阶段,JobManager 为应用程序中的每个算子发出检查点完成的回调。
数据源和窗口算子没有外部状态,因此在提交阶段,这些算子不用执行任何操作。但是,数据接收器有外部状态,因此此时应该提交外部事务:
我们总结一下:
- 一旦所有算子完成预提交,就会发出一个提交。
- 如果至少有一个预提交失败,那么所有其他的提交也都会中止,并将回滚到上一个成功完成的检查点。
- 在预提交成功之后,必须保证提交最终成功 - 我们的算子和外部系统都需要保证这点。如果一个提交失败(例如,由于间歇性网络问题),整个 Flink 应用程序将会失败,应用程序将根据用户的重启策略重新启动,并且还会尝试一次提交。这个过程至关重要,因为如果提交最终失败,将会发生数据丢失。
因此,我们要确定所有算子都同意检查点的最终结果:所有算子都同意数据提交或中止提交并回滚。
小结
- Flink 检查点是支持两阶段提交协议并提供端到端的 Exactly-Once 语义的基础。
- 这个方案的一个优点是: Flink 不像其他一些系统那样,通过网络传输存储(materialize)数据 - 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。
- Flink 新的 TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用逻辑,并使构建端到端的 Exactly-Once 语义的应用程序(使用 Flink 和支持事务的外部系统)成为可能。
- 从 Flink 1.4.0 开始,Pravega 和 Kafka 0.11 producer 都提供了 Exactly-Once 语义;在 Kafka 0.11 中首次引入了事务,这使得 Kafka 在 Flink 实现 Exactly-Once producer 成为可能。
- Kafka 0.11 producer 是在 TwoPhaseCommitSinkFunction 基础之上实现的,与 At-Least-Once 语义的 Kafka producer 相比,它的开销非常低。