官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/
所有的functions和operator在flink中,都是可以stateful。stateful functions通过处理单独的元素/事件
为了使得 state有容错性,flink需要使用checkpoint状态.Checkpoint允许flink恢复状态和位置在流中,使得应用有相同的预付达到任意失败的运行。
状态有两种
- keyed状态.Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.
- 操作的状态.With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
CheckPoints
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#overview
打开和配置Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
//当程序关闭的时候,会触发额外的checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
相关联的配置项
一些相关联的参数在conf/flink-conf.yaml
- state.backend。如果打开,可以用以存储operator的状态的checkpoint。支持的后端有:
- jobmanager In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
- filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
- state.backend.fs.checkpointdir:存储checkpoint的目录,文件系统是flink支持的文件系统。注意:State backend必须从jobmanager可访问,使用flie:// 只能在local搭建的情况下。
- state.backend.rocksdb.checkpointdir
- state.checkpoints.dir
- state.checkpoints.num-retained
Resuming from an externalized checkpoint
A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
需要从checkpoint的meta数据恢复程序。注意:如果meta data文件不是自包含的,jobmanager就需要访问关联的数据文件
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
注意:
This directory will then contain the checkpoint meta data required to restore the checkpoint. For the MemoryStateBackend, this meta data file will be self-contained and no further files are needed.
FsStateBackend and RocksDBStateBackend write separate data files and only write the paths to these files into the meta data file. These data files are stored at the path given to the state back-end during construction.
在使用FsStateBackend and RocksDBStateBackend 情况下,会把文件分开存储,只需要填写这些meta文件保存的路径即可。
Directory Structure
可以通过配置state.checkpoints.dir
比如:
state.checkpoints.dir: hdfs:///checkpoints/
这些文件是可以在保存在后端的时候通过construction指定的。经验证,是可行的。
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
checkpoint与savepoint的区别
- use a state backend specific (low-level) data format,
- may be incremental,
- do not support Flink specific features like rescaling.
Savepoints
Triggering Savepoints
When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:
# Savepoint target directory
/savepoints/
# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/
# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata
# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...