State Backend Support
local state
Local state backends maintain all state in local memory or outof-core, within an embedded key-value database such as RocksDB. Out-of-core access is preferred in production deployments asin that case state size is only limited by the quota of the local filesystem allocated to each node. When a snapshot operation is triggered, a copy of the current local states is written to a durable storage layer (e.g., a distributed file system directory). The local statebackends can support asynchronous and incremental snapshotting (Section 4.2) and yield considerable read/write performance as they exploit data locality while eliminating the need for distributed or transactional coordination(在写local state时不用考虑与其他task 进行事物协调,因为当发生failover事,会从remote state 载入全局一致的state,同时由于状态在本地,可以进行高效的读写,缺点是快照的时候需要将状态副本写入远程state).
External State
External State Backends, where state access is internally coordinated with an external system such as a database or key/value store.(状态的访问外部数据库,计算存储分离)
Non-Mvcc database,can be supported by maintaining within each task a write-ahead-log (WAL) of pending state changes per epoch. The WAL can be committed to the database once an epoch has been completed. In more
detail, each snapshot is one distributed bulk 2-phase commit transaction: during a checkpoint, changes are put into the transaction log (WAL) and pre-committed when triggerSnapshot() is invoked. Once the global snapshot is complete, then pre-committed states are fully-committed by the JobManager in one atomic transaction(每个task维护一个wal log 存储 pending state,在完成时提交). This approach is feasible even when the database does not expose the necessary program hooks to log, pre-commit, and fully commit.
MVCC MVCC-enabled databases allow for committing state
across multiple database versions. This can integrate with Flink’s snapshotting mechanism by associating each state update with the undergoing epoch. Once a snapshot is fully committed the version is atomically incremented. Likewise, a failed snapshot epoch decrements the current version. (数据库通过mvcc 支持撤回,避免了使用WAL 缓存预提交数据)
External 比local的优势
A general advantage of external state backends is that rollback recovery does not require any I/O to retrieve and load state from snapshots (contrary to local state
backends). This benefit becomes particularly impactful when state
is very large, by avoiding any network I/O upon reconfiguration,
thus, making it a suitable choice under low latency requirements.
Finally, another benefit that comes out-of-the-box with all external
backends is support for incremental snapshotting, since, by definition, only changes are committed externally。(计算存储分离可以很方便的实现状态恢复,不需要像local state需要从DFS载入状态副本,同时原生支持增量快照,只需要提交对状态的修改 ,local state是通过 lsm支持的。)
补充
目前flink的状态后端都是采用local state模式,local state 可以选择 heapstatebackend 或者 RocksDBstateBackend。然后快照时将状态副本备份至DFS,这造成了对大状态恢复时间过长的问题。
采用Exteral State又会面临较大的读写延迟,阿里针对这一问题进行了优化,优化缓存解决了读写延迟的问题,可以参考https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
异步增量快照
Task收到快照对齐后,出发triggerSnapshot()获得当前task的state copy。然而这个copy可以不是物理上的,可以是逻辑上的,当实际调用时才产生。(it can be a logical snapshot that is lazily materialized by a concurrent thread)。这需要copy-on-write的数据结构支持。
flink local backends 的实现
- out-of-core state backend based on RocksDB
运用LSM tree,更新新不是立刻的,而是以异步的方式添加和压缩。进行快照时,以同步的标记当前版本号,以阻止在压缩数据时对该版本号数据的修改。The operator can then continue processing
and make modifications to the state. An asynchronous thread iterates over the marked version, materializes it to the snapshot store,
and finally releases the snapshot so that future compactions can
overwrite that state.
增量实现:Furthermore, the LSM-based data structure
also lends itself to incremental snapshots, which write only parts to
the snapshot store that changed since the previous snapshots
-in-memory
Flink’s in-memory local state backend implementation is based
on hash tables that employ chain hashing. During a snapshot, it
copies the current table array synchronously and then starts the external materialization of the snapshot(用同步的方式获取state的snapshot copy,然后用异步的方式external materialization,我的理解是序列化,输出到外部存储), in a background thread. The operator’s regular stream processing thread lazily copies the state entries and overflow chains upon modification, if the materialization thread still holds onto the snapshot(当materialization thread持有快照时,采用lazy copy的方式,只有当流处理线程对快照进行过修改,才会deep copy state). Incremental snapshots for
the in-memory local backend are possible and conceptually trivial
(using delta maps), yet not implemented at the current point.
-
Lazy Copy
A lazy copy can be defined as a combination of both shallow copy and deep copy. The mechanism follows a simple approach – at the initial state, shallow copy approach is used. A counter is also used to keep a track on how many objects share the data. When the program wants to modify the original object, it checks whether the object is shared or not. If the object is shared, then the deep copy mechanism is initiated.
参考
1、
Flink增量快照https://www.slideshare.net/FlinkForward/stephan-ewen-scaling-to-large-state
levelDB介绍
https://www.cnblogs.com/haippy/archive/2011/12/04/2276064.html
2、
external state 尝试
http://osdir.com/apache-flink-development/msg10919.html
https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
3、
Flink 状态管理:Carbone P, Ewen, Stephan, Fóra, Gyula, et al. State management in Apache Flink?[J]. Proceedings of the Vldb Endowment, 2017, 10(12):1718-1729.