1. 什么是State
有时候除了一次处理一个event, 我们也想记录处理多个event的信息,这个时候的操作就是stateful有状态的。
2. 按key分组的state
按key分组的state就是一个key-value store, state的分区和分发都是严格和stream在一起的,对keyed state的访问只能在对应的keyed stream上,这保证了keyed state的更新都是在本地发生的, 既保证了一致性又不需要有事务开销。keyed state又进一步被组织成Key Groups. Key Groups是Flink能分发keyed state的最小单元,key groups的数量与最大并行度一致。
3.State Persistence
Flink通过 stream replay 和 checkpointting 实现容错。一个checkpoint记录了在某一点所有的input streams在所有operators的执行状态,通过恢复checkpoint,并将stream重置到与checkpoint对应的event,可恢复flink应用的运行。checkpoint是默认关闭的。为了配合checkpoint实现容错,stream data source需要是能replay的,比如kafka.
3.1 Checkpoingtting
checkpointting 是异步的,checkpoint barriers可以不一致,每个operation可以异步地拍下各自状态的快照。
3.1.1 Barriers
stream barrier 是插入到data stream中的一条记录,用来区分哪些记录应该在当前snapshot中,哪些应该到下一个snapshot中。barrier会携带当前snapshot的ID,是非常轻量的不会影响stream的处理。一个stream里可以同时存在多个barrier, 即多个snapshot可能在同时发生。
stream barrier从data source开始插入到data stream, 一个中间operator从它所有的input streams都接收到snapshot n的barrier后,向它的所有output stream发出一个barrier. 当data sink 从它的所有input stream都接收到barrier后,向jobmanager报告snapshot n已完成,所有的data sink都完成以后,snapshot n结束。一个snapshot n结束之后,job不会再处理Sn之前的记录。
如果一个operator有多个input stream, 它要基于barrier对齐它们:
- operator收到某一个input stream的barrier以后,就不能再处理这个stream之后的event, 而应该把他们放到缓存里
- operator收到最后一个input stream的barrier后,先发出所有要发出的属于当前snapshot的记录,再发出一个barrier给所有output stream
- operator把当前state记录到snapshot, 恢复处理缓存中的记录, 再处理新到的记录
- operator将state异步写回到state backend
3.1.2 Snapshotting Operator State
operator的state也要包含再snapshot中。
operator接收到它的所有input steam的barrier后,开始记录自己的snapshot, 将snapshot存储到state backend后,向output stream发出barrier。
snapshot包含:
- 对每一个stream, snapshot开始时处理的数据位置offset/position
- 对每一个operator, 一个指向snapshot中state的指针
3.1.3 Recovery
一旦程序失败了,Flink选择一个最近的完整的checkpoint恢复operator的状态和input stream的位置。如果state是增量创建的,则选择一个最近的完整的,然后依次将增量修改更新到state上。
3.2 Unaligned Checkpointting
不对齐的checkpointting过程如下:
- operator对接收到的barrier做出响应
- operator立即向output stream发出barrier
- operator将提前到达的记录(在最后一个stream的barrier到达之前处理的其他stream中不属于本snapshot的记录)做上标记并创建一个快照
不对齐的checkpointting能保证尽快处理掉所有记录,降低延迟,对于多个input stream速度不一致的情况很有效。
不对齐的checkpointting的做异常恢复时要先处理那些提前到达的记录,再处理新到达的记录,其他过程与对齐的checkpointting一样。
3.3 State Backends
State可以存储在内存中,也可以存储在RocksDB中,选用的数据结果取决于选用的backend.
3.4 Savepoints
所有支持checkpoint的程序都支持savepoint. savepoint由用户手动触发,且不会自动过期,其他与checkpoints一样。
3.5 Exactly Once & At Least Once
对齐的checkpoint能够保证exactly once, 而不对齐的checkpoints只能保证al least once. 因为不对齐的checkpoints中已经包含了应该在之后的snapshot中的数据,而恢复以后又会再处理一次。
4. 批处理程序中的状态和容错
Flink以流处理的方式对待批处理,即一个DataSet被当作一个有界流处理,所以批处理中的状态和容错与流处理大体一致,仅有以下几个差一点:
- 批处理的容错不使用checkpoints,而只是把所有的数据重新处理一遍
- DataSet API中的状态处理使用内存数据结构,而不是用key-value索引
- DataSet API引入了只能在有界流上使用的特殊的同步迭代方法