Flink提供了不同的状态存储方式,并说明了状态如何存和存储在哪里。
状态可以被存储在Jvm的堆和堆外。根据状态存储方式的不同,Flink也能代替应用管理状态,意思是Flink能够进行内存管理(有必要的时候,可能会溢出到硬盘),允许应用保存非常大的状态。默认情况下,在配置文件flink-conf.yaml
中为所有Flink作业配置状态存储方式。
然而,默认的状态存储方式配置可以被单独的作业设置覆盖,就像下面那样。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
使用Data Stream API
写的程序经常需要以多种情况保存状态:
- 在窗口被触发之前, 窗口需要保存或聚合元素
- 转换算子也许会使用key/value状态接口保存数据
- 转换算子也许实现
CheckpointedFunction
接口使本地变量容错。
当checkpointing被激活的时候,一旦发生checkpoint,状态会被保存,这样数据就不会丢失,并且在恢复的时候能够保持数据一致性。状态在内部是怎么表示的,以及当checkpoint的时候,状态怎么样被保存,以及保存到哪里依赖选择的状态存储方式。
Flink提供了三种开箱即用的状态存储方式:
- MemoryStateBackend 内存存储
- FsStateBackend 文件系统存储
- RocksDBStateBackend RocksDB存储
如果没有特殊配置,系统默认使用内存存储方式。
MemoryStateBackend 内存存储
内存存储:在Java
堆中保存状态对象。Key/Value状态和窗口算子都会以Hash表的方式保存状态值,触发器等。
当checkpoint的时候,状态存储将会快照状态,将当checkpoint向JobManager发送回执消息时,作为消息的一部分发给JobManager(master),JobManager会将状态存储到堆内存中。
可以配置内存存储使用异步快照。我们也强烈推荐使用异步快照,避免阻塞流处理通道。请注意默认是打开异步快照的。如果想要关闭这个特性,用户可以在实现化MemoryStateBackend
对象的时候,给构造函数中相应的boolean
参数传false
(这应该仅用于调试目的)。
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
内存存储有如下限制:
- 每一个状态大小默认不超过
5M
。这个值可以在实例化MemoryStateBackend
的时候增加 - 不管配置的最大状态大小是多少,状态大小不能超过akka配置的桢(一次RPC传输的数据)大小(参数: akka.framesize,默认:10M)。
- 聚合的状态必须适合
JobMaanger
内存
以下情况推荐使用内存存储
- 本地开发或调用
- 只保存少量状态的作业。例如仅仅包含一次一条记录算子(例如:Map,FlatMap,Fliter,....)的作业。对于这样的作业,
Kafka Consumer
仅仅需要非常少的状态。
FsStateBackend 文件系统存储
通过配置文件系统的URL(类型,地址,路径)使用文件系统存储。例如"hdfs://namenode:40010/flink/checkpoints"
或者"file:///data/flink/checkpoints"
。
FsStateBackend
将状态数据保存在TaskManager’s
内存中。当checkpoint的时候,将状态数据写到配置的文件系统或目录中。最小的元数据会存储到JobManager
内存中(或者在HA模式下,存储到checkpoint元数据中).
FsStateBackend
默认使用异步快照,以避免阻塞流处理。如果想禁止该特性,在实现化FsStateBackend
对象的时候,构造函数中应的参数传入false
即可。
new FsStateBackend(path, false);
以下情况,推荐使用FsStateBackend
- 具有大状态,长窗口,大的key/value状态的作业
- 所有HA模式下
RocksDBStateBackend RocksDB存储
要想使用RocksDB存储,需要配置文件系统的URL(类型,地址,路径)。例如"hdfs://namenode:40010/flink/checkpoints"
或者"file:///data/flink/checkpoints"
。
RocksDBStateBackend
将状态数据保存到RocksDB数据库.RocksDB文件默认会存储到TaskManager的数据目录中。当checkpoint的时候,整个RocksDB数据库将会保存到配置的文件系统或目录中。最小的元数据会存储到JobManager
内存中(或者在HA模式下,存储到checkpoint元数据中).
RocksDBStateBackend
总是执行异步快照。
RocksDBStateBackend
具有如下限制:
- 由于 RocksDB JNI通信使用的API基于byte[],每个key或每个value最大支持2^31字节。
注意: 在以RocksDB作用存储情况下,使用merge操作的状态(例如:ListState)会默默地将值大小累加到大于2^31字节,当再次读取的时候会失败,这是目前RocksDB JNI的限制。
以下情况,推荐使用RocksDBStateBackend
- 具有非常大的状态,长窗口,大的key/value状态的作业
- 所有HA模式下
你可以保存的状态数据量仅仅受限于磁盘剩余空间大小。与将状态保存到内存中的``FsStateBackend `相比,可以保存更大的状态。然而这也意味着能达到的最大吞吐量更小。因为所有从rocksDB读或写入rocksDB都需要经过序列化与反序例化,比那些基于Java堆的存储后端开销更大。
RocksDBStateBackend
是目前唯一提供 增量的checkpoint的存储。
RocksDB的一些指标可以被获取,但是默认没打开,可以在这里找到全部文档说明。
配置状态存储
如果你什么也没配置,默认的状态存储在JobManager
内存中。如果你希望为所有作业默认一个其它的存储,你可以在flink-conf.ymal
中配置其它的存储。当然,每一个作业也能单独设置存储。
每个作业单独设置存储
下面示例显示StreamExecutionEnvironment
的作业如何设置存储。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果你想使用RocksDBStateBackend
,你就必须在你的Flink项目中添加如下Maven依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.0</version>
</dependency>
设置默认的状态存储
默认的状态存储能够在flink-conf.yaml
文件中配置,参数是state.backend
. 值可以选择jobmanager(MemoryStateBackend)
, filesystem(FsStateBackend)
,rocksdb(RocksDBStateBackend)
三者中的一个,也可以配置实现了接口StateBackendFactory
的全类名。例如: RocksDBStateBackend
的实现类org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
.
state.checkpoints.dir
参数定义了checkpoint数据和元数据文件存储的位置,你可以在这里发现更详细的checkpoint目录结构说明
配置示例:
# 状态存储
state.backend: filesystem
# checkpoints数据存储目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
翻译自: https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html