这篇文档解释了如何在开发应用时,使用Flink的state。
Keyed State and Operator State
在Flink有两种基本的state:keyed state 与 operator state
Keyed state
keyed State 是与key关联的,只能在应用于keyedStream上的function与operator中使用。
你可以认为keyed state是分区后每个key都有自己的一个state的operator state。每一个keyed state逻辑上都与<parallel-operator-instance, key> 一一对应,又由于每一个key仅存在于某个特定的key operator实例,因此也可以说与<operator, key>一一对应。
keyed state 然后会被组织为key group。Key Group是一个原子性的单位,可以被flink用于重分配key state。key group的个数与job最大并行度相同(注:也就是有可能某个key operator实例持有多个key group)。在执行期间,每一个key operator实例都可以对其持有的一个或多个key group中的多个key state进行操作。
Operator state
对于 operator state(或者 non-keyed state)来说,每个operator state 都与一个operator 实例一一对应。Kafka connector是一个很好的例子。kafka consumer的每一个实例,都维持一个map(map类型的state),包含topic partitions以及offset。
当opetaor的并行度发生改变时,operator state接口支持对其state的重分配。有多种方案实现这种重分配。
(注:他俩的区别可以这么理解:Operator State:每个应用于non-key stream的operator实例都可以有自己的state,operator 与 state对应。而Keyed State:每个应用于keyedStream 的operator实例对应的是key group,key group是由多个key的state组成的,而且key state还可以在function中)
Raw and Managed State
keyed state 与 operator state 都存在两种形式: managed 与 raw。
managed state 代表flink可以在运行时控制的数据结构,数据结构如内部的hash table,RocksDB。managed state的举例,如:ValueState,LIstState等。Flink的运行时会对state进行编码,并且写入checkpoint。
raw state 是那些在operator中使用自己的数据结构的state。当checkpoint时,他们仅会写一个字节序列到checkpoint中。Flink并不知道state的数据结构,仅仅是看到这些字节。
所有的function中都可以使用managed state,而raw state仅可以在实现operator时使用。推荐使用managed state,因为当job的并发度发生改变时,flink可以自动的对managed state进行重分配,并且使用managed state有更好的内存管理。
注意:如果你的managed state需要自定义的序列化逻辑,可以参阅corresponding guide来确保兼容性。Flink的默认序列化器不需要特殊处理。
Using Managed Keyed State
managed key state 接口提供了访问不同类型state能力,这些state都对应于输入流中数据的key。这意味着,managed key state只能在 keyedStream 中使用,一般可以通过 stream.keyBy(...)来创建 keyedStream。
现在,我们会先看一下哪些类型的state可用,然后会展示如何在代码中使用。下面是可用的基本state:
- ValueState<T>:维持一个可以被更新与获取的value(就如上文说的,该state对应于一个key,因此对于一个operator实例来说,可以获取到其中各个key的对应的state),state的值可以通过 update(T) 来更新,可以通过 T value()来获取。
- ListState<T>:这个state会维持一个list。你可以追加元素,也可以获取到当前存储的所有元素的 Iterable。通过 add(T) / addAll(List<T>) 来添加元素,通过 Iterable<T> get() 方法获取迭代器。你也可以使用 update(List<T>) 来覆盖state。
- ReducingState<T>:这个state维持一个值,该值代表所有添加到这个状态的值的聚合。接口和ListState类似,但是使用 add(T)添加元素,会导致该值进入 ReduceFunction 执行聚合操作。
- AggregatingState<IN,OUT>:这个state维持一个值,该值代表所有添加到这个状态的值的聚合。与ReducingState不一样的是,聚合后的值的类型可以和输入值的类型不同。接口和ListState类似,但是使用 add(T)添加元素,会导致该值进入 AggregateFunction 执行聚合操作。
- FoldingState<T,ACC):这个state在1.4就弃用了,在后面的版本中会完全删除,可以使用 AggregatingState代替
- MapState<UK,UV>:这个state维持了一个map。你可以将key-value对put到state中也可以获取当前map的Iterable。可以使用 put(UK,UV)/putAll(Map<UK,UV>)来添加map。获取某个user key的值,可以使用 get(UK).Iterable 可以使用 entries(),keys(),values()方法。
所有类型的state都有clear()方法,该方法会清除当前key对应的状态。
第一件重要的事情是,这些state对象都是仅用来连接state的,state不一定存储在内存中,也有可能在磁盘上或者其他地方。第二件重要的事情是,你从state中获取到的value取决于输入流中的key。因此如果你在自定义函数中调用了state的方法获取值时,在不同的key时,返回的结果是不同的。
为了手动获取state,你必须创建 StateDescriptor。这个类会保存这个state的名称,state中value的类型,以及可能会有的自定义函数,如 ReduceFunction(你可以定义好几个state,使用不同的名字命名,这样可以通过名称来获取它们)。取决于你想要使用何种类型的state,你可以选择创建如下的descriptor:ValueStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, FoldingStateDescriptor 或者 MapStateDescriptor。
state 可以通过 RuntimeContext 来获取,因此只有在 rich function中才可以使用state。RichFunction中的RuntimeContext有如下方法可以获取state:
- ValueState<T> getState( ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面的示例使用 FlatMapFunctino 展示了如何使用上面说的特性:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
这个示例中,使用tuple的第一个field作为key(在示例中,只有一个相同的key:1),在function中,存储该key出现的次数以及计算总和,放入Valuestate中。一旦次数大于等于2,就会计算并发射平均值,然后清空state,这样我们就会再次从0开始。注意的是,如果我们的输入有多个key的话,对于不同的key会有不同的state value。
State Time-To-Live(TTL)
任何类型的state都可以指定ttl。当配置了TTL并且state value过期后,存储的value会已一种高效的方式清除,下面会详细介绍。
state的集合类型支持per-entry TTL,这意味着list中的元素或者map中的entry的过期时间是相互独立的。
为了使用state的TTL功能,必须要构造一个 StateTtlConfig 的配置对象。然后就可以通过向state descriptor中传configration的方式实现对应state的TTL功能。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
configuration有几个需要注意的点:
newBuilder 方法的第一个参数是固定的,就是 time-to-live 的值
update type 配置什么时候state的TTL被刷新(默认是 onCreateAndWrite)
- StateTtlConfig.UpdateType.OnCreateAndWrite - 仅当创建或写入时刷新
- StateTtlConfig.UpdateType.OnReadAndWrite - 创建,写入或读取时都可以刷新
state visibility 配置了当过期数据未被清除时,读取该数据是否会返回数据的值。(默认是 NeverRetrunExpired)
- StateTtlConfig.StateVisibility.NeverReturnExpired - 过期的数据不会返回
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanUp - 如果还未被清除则返回过期数据
当使用 NeverReturnExpired 时,过期state的表现就像是真的不存在了一样,即便它又可能还未被完全移除。这个策略在数据过去后就不可见的场景中很适合,例如与敏感隐私数据交互的应用。
另一个选项 ReturnExpiredIfNotCleanUp 允许返回未被清除的过期数据。
注意:
- state数据进行最后修改的时间戳会和state一起存储在状态后端,也意味着会增加状态存储的消耗。使用堆内存作为状态后端时,会使用另一个类,该类拥有指向用户state对象的引用以及一个private long值。使用RocksDB作为状态后端时,会在每个存储的value,每个list的元素以及每个map的entry前添加8字节。
- 当前仅支持 TTL 的时间引用 processing time
- 使用TTL重新存储之前为配置TTL时的state,会导致兼容故障与 StateMigrationException,反之亦然。
- TTL 配置不是checkpoint/savepoint 的一部分,而是一种说明flink如何处理当前正在运行的job。
- 使用TTL配置的map state,如果user value的序列化器可以处理null值,则允许其写入null。如果序列化器不支持null值,它可以被 NullableSerializer 包装起来,但是会在序列化框架中多耗费额外的字节。
Cleanup of Expired State
默认情况下,过期的数据只有在读取超时时,被清除,也就是调用 ValueState.value()。
注意:这意味着,默认情况下,如果过期state没有被read,则不会被清除,可能会导致state不断增长。这可能会在以后的版本中解决。
在snapshot中进行cleanup
除了上面的默认方式外,你也可以在执行 full state snapshot时触发cleanup,这样也会减少state snapshot 的大小。在目前的实现中,本地state不会被清除,但是当从以前的snapshot恢复时,不会加载snapshot中过期的state。这种方式可以在 StateTtlConfig中配置。
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
这种方式不合适RocksDB状态后端的incremental checkpoint。
注意:
- 对于已经存在的job来说,clearup 策略可以随时使用 StateTtlConfig 来生效或者失效,如:修改配置后,从savepoint处重启
Incremental cleanup
另一个选项是,当一些state有增量变化时,触发cleanup。可以在每次访问state或者处理每条数据时的回调函数中触发cleanup。如果对某些state使用了这种cleanup策略,存储后端就会维持一个包含所有state值的iterator。每次触发cleanup时,都会迭代一遍,被扫描的state值都会被检查,过期的值就会被清除。
这个策略可以在 StateTtlConfig中配置:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally()
.build();
这个策略有两个参数。第一个参数为数字,代表每次触发cleanup时需要检查的state的值的数量。如果启用了这个值,每次state被访问时,都会触发cleanup。第二个参数定义了是否需要在每次数据处理是调用cleanup。
注意:
- 如果该state被访问或者没有数据被处理,过期的数据不会被清除
- 为了cleanup所花费的时间,会导致处理延迟的增加
- 目前,incremental cleanup的实现仅针对 Heap 状态后端。如果设置的状态后端为 RocksDB,则不会生效
- 如果Heap状态后端使用同步snapshot机制,全局iterator会在迭代时,维持所有key的副本,因为目前它不支持并发修改,因此这会增加内存消耗。异步snapshot不会有这个问题
- 对于已经在运行的job,可以通过修改 StateTtlConfig来激活/失效cleanup 策略,然后通过savepoint重启。
Cleanup during RocksDB compaction
如果使用RocksDB作为状态后端,另一个cleanup策略是启用Flink的 compaction filter。RocksDB周期性的异步的 压缩合并的state减少存储。Flink compaction filter会检查state的过期数据,这些数据会被排除。
这个特性默认是关闭的。首先需要配置状态后端为RocksDB,然后可以通过设置 state.backend.rocksdb.ttl.compaction.filter.enabled或者调用RocksDBStateBackend::enableTtlCompactionFilter。这样,配置使用TTL的state就可以使用该策略清除数据了。
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter()
.build();
RocksDB compaction filter会在处理完特定数量的state值后,从flink查询当前timestamp用于检查过期。这个数值默认是100.你可以通过 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
来修改。经常更新timestamp可以提高cleanup的速度,但是会掉那个地compaction的性能,因为compaction使用JNI来调用native code。
你可以配置 FlinkCompactionFilter 来启动 RocksDB filter 的debug 日志:
log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 在 compaction 是调用 TTL filter会降低它的速度。TTL filter要解析最后一个访问state的时间戳,然后在正在 compact 的数据中判断每个数据是否过期。如果state 的类型是集合(如list或map),那么就是检查集合的每个元素。
- 如果这个策略使用在list state中,且list中的元素的字节长度不固定,原生的TTL filter会针对集合的元素,额外的调用flink的java type serializer,来处理集合中每个元素,至少是第一个过期的元素,以便决定下一个过期元素的offset。
- 对于已经在运行的job,可以通过修改 StateTtlConfig来激活/失效cleanup 策略,然后通过savepoint重启。
State in the Scala DataStream API
scala的api暂不翻译
使用 Managed Operator 状态
为了使用 managed operator state,一个有状态的function必须实现 CheckpointedFunction 接口或
ListCheckpointed<T extends Serializable> 接口。
CheckpointedFunction
CheckpointedFunction 接口提供了访问 non-keyed state 以及 不同重分配方案的能力。他要求实现两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
一旦checkpoint被执行,就会调用 snapshotState() 方法。与之相对的,initializeState() 方法在每次用户自定义函数初始化时被调用,可以使第一次初始化时,也可以是从某个checkpoint故障恢复时。基于此,initializeState() 方法不仅是各种state初始化的地方,也是state从故障处恢复的地方。
目前,managed operator state支持list-style 的值。List的元素需要是一个可序列化的对象,且相互之间独立,这样才可以在应用伸缩时进行state的重分配。换句话说,这些对象是在重新分配non-key state时的最小粒度。取决于状态的访问方法,定义了下面两个重分配方案:
- Even-split redistribution:每个operator都有会一个含有state数值的list。逻辑上讲,所有的state是所有list的合并。当重启或者重分配时,list会被均匀的划分到每个并发操作符实例中。每一个操作符都会有一个sublist,它可以是空,也可以有一个或多个元素。如,在并发度为1时,操作符包含两个元素 e1,e2,当增加并发度到2时,可能e1在第一个操作符实例,e2在第二个操作符实例中。
- Union redistribution:每个operator都有会一个含有state数值的list。逻辑上讲,所有的state是所有list的合并。当重启或者重分配时,每个操作符都持有含有所有的state数值的list。
下面的示例展示了一个实现了 CheckpointedFunction 的 SinkFunction ,它会在将数据写入其他系统时,缓存数据。它展示了 even-split redistribution :
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
initializeState 的入参为 FunctionInitializationContext。它被用来初始化 non-keyed state。上面的示例中,使用了 ListState 类型的state,用来存储需要被checkpoint的数据。
注意state是如何初始化的,同 keyed state类似,使用 StateDescriptor来获取初始化的state。
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
获取state的方法的名称代表了该state所使用的重分配策略。例如,想要对list state使用 union redistribution策略,则需要在获取state时调用 getUnionListState(descriptor)方法。如果方法名不包含重分配策略的信息,如:getListState(descriptor),那么,它默认实现的是 even-split redistribution 策略来重分配。
初始化 state 后,我们使用context的 isRestored() 方法来判断是否是故障恢复的情况。如果是的话,就需要在初始化时,应用我们自定义的故障恢复逻辑。
就像在 BufferingSink 中演示的那样,在状态初始化期间生成的 ListState 会保存在类变量中,以供将来可以在 snapshotState() 方法中使用。snapshotState() 方法中,先调用 ListState.clear() 方法将上次checkpoint的数据清空,再填上这次checkpoint需要的数据。
作为旁注:keyed state也可以在 initializeState() 方法中初始化。可以使用提供的 FunctionInitializationContext 来完成。
ListCheckpointed
ListCheckpointed 是一个在 CheckpointedFunction 基础上加了限制的变种,它仅支持list-style 状态使用 even-split 重分配策略。它也要求实现下面的两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
snapshotState() 方法应该返回需要被checkpoint的方法,而restoreState 会处理一个类似的list用于故障恢复。如果state不是 re-partitionable 的,你可以在snapshotState()返回 Collections.singletonList(MY_STATE).
Stateful Source Functions
stateful source与其他操作符相比,需要更多的操作。为了使状态的更新与数据的发射变得原子性(要求在故障/恢复时做到exactly-once语义),用户需要从source context中获取一个所。
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** current offset for exactly once semantics */
private Long offset = 0L;
/** flag for job cancellation */
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
某些operator可能需要知道Flink是什么时候确认checkpoint完成的,然后将这个信息与外部系统交互。这种情况可以参阅 org.apache.flink.runtime.state.CheckpointListener 接口。