1 概述
Apache Flink是可以进行有状态的流处理,然而,在流处理中什么是状态呢?状态是有过去事件的在内存中的一些操作需要存储,这些存储的信息会影响未来事件的处理。
状态是基础,可以在流出中处理很多比较复杂的场景,如下:
- 当应用需要搜索一种特定的时间模式,那么就需要保存数据的次序状态;
- 当聚合每分钟的时间,状态需要保存未完成的聚合信息;
- 当在线训练一个机器学习模型是,需要当前模型版本的参数。
然而,有状态的流处理只有在状态可以进行容错的时候才可用于生产环境,容错意味着即使出现软件或者硬件故障,计算结果也要保证准确,没有出现数据丢失或者重复计算等情况。
在flink中通过chekpointing来实现容错,checkpoint是一个全局的,提供异步快照机制,定期的对当前应用进行快照并存储到可靠存储上,当出现异常时,flink重启应用,并使用最近完成的checkpoint作为起点。一些用户实际可能保存的状态很大,占用上GB空间,这种情况下checkpoint的创建会非常慢,而且执行时占用的资源也比较多,从而提出incremental checkpointing
,即增量方式。
在增量方式之前,每次都是进行全量的checkpoint,但是每次快照都是基于上次的更新,不会很大,所以使用增量方式只要保持上一次与当前的差距即可。
2 示例
当前,可以使用RocksDB来作为增量checkpoint的存储,并在其中不是持续增大,可以进行定期合并清楚历史状态。
该例子中,子任务的操作是一个keyed-state,一个checkpoint文件保存周期是可配置的,本例中是2,配置方式state.checkpoints.num-retained
,上面展示了每次checkpoint时RocksDB示例中存储的状态以及文件引用关系等。
- 对于checkpoint CP1,本地RocksDB目录包含两个磁盘文件(sstable),它基于checkpoint的name来创建目录。当完成checkpoint,将在共享注册表(shared state registry)中创建两个实体并将其count置为1.在共享注册表中存储的Key是由操作、子任务以及原始存储名称组成,同时注册表维护了一个Key到实际文件存储路径的Map。
- 对于checkpoint CP2,RocksDB已经创建了两个新的sstable文件,老的两个文件也存在。在CP2阶段,新的两个生成新文件,老的两个引用原来的存储。当checkpoint结束,所有引用文件的count加1。
- 对于checkpoint CP3,RocksDB的compaction将sstable-(1),sstable-(2)以及sstable-(3)合并为sstable-(1,2,3),同时删除了原始文件。合并后的文件包含原始文件的所有信息,并删除了重复的实体。除了该合并文件,sstable-(4)还存在,同时有一个sstable-(5)创建出来。Flink将新的sstable-(1,2,3)和sstable-(5)存储到底层,sstable-(4)引用CP2中的,并对相应引用次数count加1.老的CP1的checkpoint现在可以被删除,由于其retained已达到2,作为删除的一部分,Flink将所有CP1中的引用文件count减1.
- 对于checkpoint CP4,RocksDB合并sstable-(4)、sstable-(5)以及新的sstable-(6)成sstable-(4,5,6)。Flink将该新的sstable存储,并引用sstable-(1,2,3),并将sstable-(1,2,3)的count加1,删除CP2中retained到2的。由于sstable-(1), sstable-(2), 和sstable-(3)降到了0,Flink将其从底层删除。