Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?
端对端 exactly-once 有 3 个条件:
⭐ Source 引擎可以重新消费,比如 Kafka 可以重置 offset 进行重新消费
⭐ Flink 任务配置 exactly-once,保证 Flink 任务 State 的 exactly-once
⭐ Sink 算子支持两阶段或者可重入,保证产出结果的 exactly-once
其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:
⭐ Sink 两阶段:由于两阶段提交是随着 Checkpoint 进行的,假设 Checkpoint 是 5min 做一次,那么数据对下游消费方的可见性延迟至少也是 5min,所以会有数据延迟等问题,目前用的比较少。
⭐ Sink 支持可重入:举例:
⭐ Sink 为 MySQL:可以按照 key update 数据
⭐ Sink 为 Druid:聚合类型可以选用 longMax
⭐ Sink 为 ClickHouse:查询时使用 longMax 或者使用 ReplacingMergeTree 表引擎将重复写入的数据去重,这里有小伙伴会担心 ReplacingMergeTree 会有性能问题,但是博主认为其实性能影响不会很大,因为 failover 导致的数据重复其实一般情况下是小概率事件,并且重复的数据量也不会很大,也只是一个 Checkpoint 周期内的数据重复,所以使用 ReplacingMergeTree 是可以接受的)
⭐ Sink 为 Redis:按照 key 更新数据
其他解答:Flink状态一致性、端到端的精确一次保证
状态一致性:当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数? 对于流处理内部来说,所谓的状态一致性,其实就是我们所说的计算结果保证准确。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。 一条数据不应该丢失,也不应该重复计算
一致性可以分为 3 个级别:at-most-once(最多一次):计数结果可能丢失
at-least-once (至少一次):计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once (精确一次):系统保证在发生故障后得到的计数结果与正确值一致。
数据流(DataStream)内部保证exactly-once (精确一次)的方法:Flink 使用了一种轻量级快照机制 ---- 检查点(checkpoint)来保证 exactly-once 语义
有状态应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。
端到端保证一致性:
内部保证 —— 依赖 checkpoint
source 端 —— 需要外部源可重设数据的读取位置
sink 端 —— 需要保证从故障恢复时,数据不会重复写入外部系统
而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
幂等操作:是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 例如Hashmap 的写入插入操作是幂等的操作,重复写入,写入的结果还一样。
事务写入:构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。其中预写日志(WAL)只能保证至少一次精确。
Flink+Kafka 端到端状态一致性的保证
内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复, 保证内部的状态一致性
source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后 续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据, 保证一致性
sink —— kafka producer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
由于端到端保证一致性需要用到两阶段提交(2PC)TwoPhaseCommitSinkFunction,我们来了解一下两阶段提交的方式:
第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
外部 kafka 关闭事务,提交的数据可以正常消费了。
我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的操作,之于有关后端状态的选择,后面再单独聊聊