所有的functions和operator在flink中,都是可以stateful。stateful functions通过处理单独的元素/事件
为了使得 state有容错性,flink需要使用checkpoint状态.Checkpoint允许flink恢复状态和位置在流中,使得应用有相同的预付达到任意失败的运行。
- keyed状态.Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.
- 操作的状态.With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
// advanced options:
// set mode to exactly-once (this is the default)
// make sure 500 ms of progress happen between checkpoints
// checkpoints have to complete within one minute, or are discarded
// allow only one checkpoint to be in progress at the same time
// enable externalized checkpoints which are retained after job cancellation
- state.backend。如果打开,可以用以存储operator的状态的checkpoint。支持的后端有:
- jobmanager In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
- filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
- state.backend.fs.checkpointdir:存储checkpoint的目录,文件系统是flink支持的文件系统。注意:State backend必须从jobmanager可访问,使用flie:// 只能在local搭建的情况下。
- state.backend.rocksdb.checkpointdir
- state.checkpoints.dir
- state.checkpoints.num-retained
Resuming from an externalized checkpoint
A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
需要从checkpoint的meta数据恢复程序。注意:如果meta data文件不是自包含的,jobmanager就需要访问关联的数据文件
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
This directory will then contain the checkpoint meta data required to restore the checkpoint. For the MemoryStateBackend, this meta data file will be self-contained and no further files are needed.
FsStateBackend and RocksDBStateBackend write separate data files and only write the paths to these files into the meta data file. These data files are stored at the path given to the state back-end during construction.
在使用FsStateBackend and RocksDBStateBackend 情况下,会把文件分开存储,只需要填写这些meta文件保存的路径即可。
Directory Structure
state.checkpoints.dir: hdfs:///checkpoints/
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
- use a state backend specific (low-level) data format,
- may be incremental,
- do not support Flink specific features like rescaling.
Triggering Savepoints
When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:
# Savepoint target directory
# Savepoint directory
# Savepoint file contains the checkpoint meta data
# Savepoint state