一、Checkpoints的算法原理
Checkpoints是flink自动存储快照
//1. 启用Checkpoint
env.enableCheckpointing(200);
//2. 高级选项
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);//checkpoint超时时间
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //最大并行checkpoint数
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100); //最小checkpoint的间隔时间,让两个checkpoint之间留出一定时间
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); //容忍checkpoint失败多少次,默认0,不容忍
//3. 重启策略配置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000));//每隔10s重启1次,固定延迟重启
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));//失败率重启,在10分钟内重启,每隔1分钟重启一次,重启3次
1. Barrier(checkpoint分割线)
二、Savepoints(保存点)
Savepoints是手动存储快照,多出了一些额外元数据;
Savepoints一般是手动使用命令保存当前flink任务快照到HDFS上指定目录中,重启或恢复故障时可以使用指定的Savepoints进行启动
$ ./bin/flink savepoint \
$JOB_ID \
/tmp/flink-savepoints
使用 YARN 触发 Savepoint
#$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
使用 Savepoint 取消作业
$ bin/flink cancel -s [:targetDirectory] :jobId
从 Savepoint 恢复
$ bin/flink run -s :savepointPath [:runArgs]
跳过无法映射的状态恢复
$ bin/flink run -s :savepointPath -n [:runArgs]
- 保存点除了故障恢复外,保存点还可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等等
三、状态一致性
- 有状态的流处理,内部每个算子任务都可以有自己的状态
- 对于流处理器内部来说,所谓的状态一致性,就是所说的计算结果要保证准确
- 一条数据不应该丢失,也不应该重复计算
- 在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
状态一致性分类
- AT-MOST-ONCE(最多一次)
- AT-LEAST-ONCE (至少一次)
- EXACTLY-ONCE (精确一次)
一致性检查点(checkpoints)
- Flink使用了一种轻量级快照机制---checkpoint保证exactly-once
- 有状态流应用的一致性检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。(使用checkpoint的Barrier实现,Barrier对下游分发是广播出去的,下游需要所有的Barrier都到了才checkpoint,这也叫Barrier对齐)
- 应用状态的一致检查点,是Flink故障恢复机制的核心
端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性
整个端到端的一致性级别取决于所有组件中一致性最弱的组件
exactly-once怎么保证,状态一致就行,不是操作一致内部保证 ---checkpoint
source端---可重设数据的读取位置
sink端---从故障恢复时,数据不会重复写入外部系统
- 幂等写入
- 事务写入
幂等写入(Idempotent Writes)
- 所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了
典型应用:hashmap,key-value Pairs, Redis的hash表,ES指定_id都满足幂等操作
幂等写入只保证最终结果不变,中间过程会有重复写入
事务写入(Transactional Writes)
- 事务(Transaction)
1.应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销- 具有原子性:一个事务中的一系列的操作要么全部成功,要么一个都不做
- 实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中
- 实现方式
- 预写日志
- 两阶段提交
预写日志(Write-Ahead-Log, WAL)
- 把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
- 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定
- DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink
缺点:会加大延迟,不能严格意义实现Exactly once
两阶段提交(Two-Phase-Commit, 2PC)
- 对于每个checkpoint, sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里
- 然后将这些数据写入外部sink系统,但不提交他们 ---这时只是“预提交”
- 当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入
- 这种方式真正实现了exactly-once,他需要一个提供事务支持的外部sink系统。flink提供了TwoPhaseCommitSinkFunction接口(关系型数据库,mysql,SQL server,Oracle,Posteglsql, kafka)
2PC对外部sink系统的要求
- 外部sink系统必须提供事务支持,或者sink任务必须能够模拟外部系统上的事务
- 在checkpoint的间隔期间里,必须能够开启一个事务并接受数据写入
- 在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态,在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失。
- sink任务必须能够在进程失败后恢复事务
- 提交事务必须是幂等操作
优点:对性能影响比较低
四、kafka的Exactly-once两阶段提交步骤
- 第一条数据来了之后,开启一个kafka的事务(tranasaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”
- jobmanager触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子讲状态存入状态后端,并通知jobmanager
- sink连接器收到barrier,保存当前状态,存入checkpoint,通知jobmanager,并开启下一个阶段的事务,用于提交下个检查点的数据
- jobmanager收到所有任务的通知,发出确认信息,表示checkpoint完成
- sink任务收到jobmanager的确认信息,正式提交这段时间数据
- 外部kafka关闭事务,提交的数据可以正常消费了