有状态函数(Stateful function)和算子(Operator)在处理独立的数据或事件时存储数据,使得状态(State)成为任何复杂算子中的关键部分,例如:
- 当应用检索特定的事件模式,State 将存储到接收到的事件的序列。
- 当按照分钟/小时/天的唯独聚合事件时,状态将保留待处理的聚合状态。
- 当在流数据上进行机器学习的模型训练时,状态保存模型当前版本的参数。
- 当需要管理历史数据时,状态允许有效地访问过去发生的事件。
使用 Flink 需要了解 State,以便应用的检查点(checkpoint)容错和保存点(savepoint)可用。
State 的使用
Keyed State 和 Operator State
Flink 中的状态分为两种:Keyed state 和 Operator state
Operator State
每个 Operator state(non-keyed state) 都绑定到一个并行的算子实例上。可以参考 Kafka connector 的例子,Kafka 消费者的每个并行实例都维护一个 topic 分区和偏移(offset)的对应关系作为 Operator state。Operator state 支持当并行度更改时,在并行的算子实例间重新分配 State,进行这种重新分配有多种不同的方案(后面会介绍)。
Keyed State
Keyed state 与键(key)相关,只能在 KeyedStream 上应用的函数和算子内使用。可以将 Keyed state 认为是分区后的 Operator state,每个 key 有一个状态的分区(state-partition)。逻辑上,每个 Keyed state 绑定唯一的 <parallel-operator-instatnce, key>
(算子并行实例和 key 的一对元组),可以将其简单地视为 <operator, key>
(因为每个 key 属于算子的唯一一个并行实例)。
Keyed state 进一步被组织称为 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子单位,Key groups 的数量与最大并行度相同。在程序执行期间 keyed operator 的每个并行实例都使用一个或多个 Key groups 的 keys。
Raw State 和 Managed State
Keyed state 和 Operator state 有两种形式:托管状态(managed)和原始状态(raw)。
托管状态(managed state) 由 Flink runtime 管理的数据结构表示,例如内部哈希表或 RocksDB。Flink runtime 对 State 进行编码并写入 checkpoint。
原始状态(raw state) 是在算子内部的数据结构中的保存。Checkpoint 只会保存 State 内容的字节序列,State 的真实数据结构对 Flink 是透明的。
所有数据流函数都可以使用托管状态(managed state),而原始状态(raw state)只能在具体实现算子时使用。建议使用托管状态(而不是原始状态),因为在托管状态下,Flink 能够在并行度改变时自适应地重新分配 State,并且在内存管理方面可以做的更好。
如果要为托管状态自定义序列化的逻辑,请参考 自定义序列化 以确保将来的兼容性。默认序列化方法不需要特殊处理。
托管的 Keyed State
托管 Keys state 提供多种不同类型 State,作用域都是当前输入数据的键,只能用于 KeyedStream,可以通过 stream.keyBy(…)
创建。
首先看不同类型的状态,以及它们如何在程序中使用:
ValueState<T>
:保存了一个值,可以更新和读取(算子操作的每个 key 可能有一个 value)
update(T)
更新
T value()
取值ListState<T>
:保存了一个列表 List。可以追加元素,也可以获取到一个包含所有当前存储的元素的迭代器(Iterable)
add(T)
或addAll(List<T>)
添加到列表
Iterable<T> get()
获取迭代器
update(List<T>)
使用新的列表覆盖现有列表ReducingState<T>
:保存一个值,表示添加到 State 的所有值的聚合结果。提供的接口类似于ListState
add(T)
函数会使用指定的函数(ReduceFunction
)对添加的值进行聚合AggregatingState<IN, OUT>
:保存一个值,表示添加到 State 的所有值的聚合结果。与ReducingState
不同的是,聚合结果的数据类型可以与添加到 State 的元素的数据类型不同。接口同样类似于ListState
add(IN)
函数会使用指定的函数(AggregateFunction
)对添加的值进行聚合FoldingState<T, ACC>
:保存一个值,表示添加到 State 的所有值的聚合结果。与ReducingState
不同的事,聚合结果的数据类型可以与添加到 State 的元素的数据类型不同。接口同样类似于ListState
。已经过期的API。
add(IN)
函数会使用指定的函数(FoldFunction
)对添加的值进行聚合MapState<UK, UV>
:保存一个映射表 Map。可以将 key/value 存入 State,也可以获取到一个包含所有当前存储的元素的迭代器(Iterable)
put(UK, UV)
或putAll(Map<UK, UV>)
添加 key/value 到 Map
get(UK)
获取与指定 key 的 value
entries()
、keys()
和values()
对 Map 的元素/键/值遍历访问
所有类型的都有
clear()
方法来清除当前状态。FoldingState
和FoldingStateDescriptor
已在 Flink 1.4 中弃用,未来版本将被完全删除。可以使用AggregatingState
和AggregatingStateDescriptor
代替。
首先,这些对象仅用于与 State 交互,State 不一定存储在内存,也可能存储在磁盘或其他位置。第二点,从 State 获得的值取决于输入数据的的 key,如果处理的 keys 不同,定义的函数的调用结果会不同。
要向操作 State 对象句柄,首先必须创建一个 StateDescriptor
,该对象拥有 State 名称(可以创建多个 State,必须具有唯一的名称来引用 State),State 所持有的值的类型,可能还有用户指定的函数(例如 ReduceFunction
)。对应不同的 State 类型,有如下类对象:ValueStateDescriptor
、ListStateDescriptor
、ReducingStateDescriptor
、FoldingStateDescriptor
和 MapStateDescriptor
。然后使用 RuntimeContext
可以才访问到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 访问不通类型 State 的方法:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面是在 FlatMapFunction
中的使用示例:
实现了一个简单的计数窗口,通过输入元组的第一个参数分组,在分组的流中,每接收到两个元组,返回两那个元组的第二个参数的平均值
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
// 使用 ValueState 保存元素求和值
// 元组第一个参数为求和个数 count,第二个参数为求和值 sum
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// 访问 State 的值
val tmpCurrentSum = sum.value
// 初始值 (0, 0)
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// 求和并更新 State
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
sum.update(newSum)
// 如果 state value 达到 2, 发送统计的平均值并清空 state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
// 通过 RuntimeContext 和 ValueStateDescriptor 获取 ValueState
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create keyed stream and call CountWindowAverage
env.fromCollection(List(
(1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
))
.keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// 输出 (1,4) (1,5)
env.execute("ExampleManagedState")
}
状态生存周期 (TTL)
生存期(TTL)可以被指定给任何类型的 Keyed state,如果配置了TTL并且 State value 已过期,State value 会被清除。所有集合类型 State (ListState 和 MapState)都支持为每个条目设置TTL。为了启用 State TTL,首先需要构建 StateTtlConfig
对象,然后通过在 ValueStateDescriptor
(其他类型同理)构造中传入该对象来启用TTL,参考下面的例子:
// Time.seconds(1) 生存时间
//
// StateTtlConfig.UpdateType 更新类型
// - UpdateType.OnCreateAndWrite - 创建和写入时更新(默认)
// - UpdateType.OnReadAndWrite - 读取和写入时更新
//
// StateTtlConfig.StateVisibility 状态可见性,访问时是否返回已经过期的值
// - StateVisibility.NeverReturnExpired - 永远不会返回过期的值(默认)
// - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以读到会返回
//
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
// 构建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 启用TTL
stateDescriptor.enableTimeToLive(ttlConfig)
补充:
- State backends 存储最新一次修改的时间戳以及更新值,因此启用TTL会增加存储的消耗。
- 目前TTL仅支持处理时间(processing time)。
- 如果之前没有配置TTL,而状态恢复时启用TTL(相反的情况同样),会引起兼容性错误和 StateMigrationException 异常。
- TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 运行时做处理。
- 对于 Map 类型,只有序列化方法支持空值时,TTL的设置才支持空值,否则需要使用
NullableSerializer
进行封装(序列化会占用额外的字节)。
过期数据清理
目前过期值只有在显式访问时才会被删除(如调用 ValueState.value()
)。如果未读取过期状态的数据,则不会将其删除,这可能会导致状态不断增长。将来的版本中可能会发生变化。
此外,可以指定在获取完整的 State 快照时激活清理方法,以减小快照的大小。当从上一个快照恢复时,不会包括已删除的过期 State。StateTtlConfig
可以指定(不适用于 RocksDB 增量 checkpoint 的情况):
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
Scala 中的快捷接口
除了上述接口外,Scala API 可以在 KeyedStream 上的有状态 map()
或 flatMap()
函数中使用 ValueState
。如下获取 ValueState
当前值,并必须返回用于更新的新值:
val stream: DataStream[(String, Int)] = ...
// 调用 mapWithState
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
托管的 Operator State
Operator state 的数据结构不像 Keyed state 丰富,只支持 List,可以认为是可序列化对象的列表,彼此独立。这些对象在动态扩展时是可以重新分配 non-keyed state 的最小单元。目前支持几种动态扩展方式:
Even-split redistribution:算子并发度发生改变的时候,并发的每个实例取出 State 列表,合并到一个新的列表上,形成逻辑上完整的 State。然后根据列表元素的个数,均匀分配给新的并发实例(Task)。
例如,如果并行度为1,算子的 State checkpoint 包含数据元 element1 和 element2,当并行度增加到2时,element1 会在 算子实例0中,而element2在算子实例1中。Union redistribution:相比于平均分配更加灵活,把完整 State 划分的方式交给用户去做。并发度发生改变的时候,按同样的方式取到完整的 State 列表,然后直接交给每个实例。
使用托管的 Operator State,有状态函数需要实现 CheckpointedFunction 接口(更通用的)或 ListCheckpointed<T extends Serializable>接口。
CheckpointedFunction
CheckpointedFunction
需要实现两个方法:
// 执行 Checkpoint 时调用
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化函数时调用
void initializeState(FunctionInitializationContext context) throws Exception;
initializeState()
函数不止在首次初始化函数调用,从 checkpoint 恢复时同样会调用。因此,不仅要初始化 State 的逻辑,还包括 State 恢复的逻辑。
下面的例子是有状态的 SinkFunction
,利用 CheckpointedFunction
在将数据元发送之前进行缓存,checkpoint 时缓存写入 State,启动时判断是否使用 重发的 State 恢复缓存:
class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
// 输入数据缓存
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
// 输入数据存入 buffer
bufferedElements += value
// 当缓存数量达到阈值,发送数据,并清理 buffer
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
// 执行 Checkpoint 时调用
// 清除前一个检查点包含的所有对象
// 缓存数据添加到 State
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
// 用于首次初始化或从 checkpoint 恢复
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
// 使用 ctx 和 ListStateDescriptor 访问 ListState
// 注意调用方法 getListState(descriptor)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
// 根据 isRestored() 方法来检查当前是否是 checkpoint 恢复的情况
if(context.isRestored) {
// 如果 true,表示是恢复失败的情况,应用恢复数据的逻辑:
// 恢复数据添加到 buffer 中
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
获取 State 句柄函数的命名中约定包含其重新分发模式,其次时状态结构。上面的例子中调用方法 getListState
表示使用了平均分发(even-split)的方式。如果要使用 union 的方式,使用 getUnionListState(descriptor)
方法返回的 ListState
,会包含完整的 State 列表,由用户决定处理恢复哪些数据。
ListCheckpointed
ListCheckpointed
是 CheckpointedFunction
的一个有限变体,只支持平均分发(even-split)的情况,同样需要实现两个方法:
// 执行 Checkpoint 时调用
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
// 初始化函数时调用
void restoreState(List<T> state) throws Exception;
有状态的 Source Functions
在 Source 方法中,更新状态和输出应该在一个原子操作下完成(为了满足精确一次的语义下的故障恢复),因此用户需要取一个锁,可以从ctx中获得。
class CounterSource extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
// 获取锁
val lock = ctx.getCheckpointLock
while (isRunning) {
// 输出和更新在一个原子操作中
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}
Broadcast State
前面提到了两种 Operator state 支持的动态扩展方法:even-split(算子的每个平行任务平均的恢复状态的一部分)和 union(每个任务获得完整的状态恢复)。Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。
Broadcast state 的特点是:
- 使用 Map 类型的数据结构
- 仅适用于同时具有广播流和非广播流作为数据输入的特定算子
- 可以具有多个不同名称的 Broadcast state
Broadcast State API
下面是一个例子来展示 Broadcast State API 的使用,在这个示例中,要处理的数据流中是有不同颜色(Color)和形状(Shape)属性的对象,希望在流中找到一对具有相同颜色的,并且遵循一个特定的形状模式的对象组(例如,一个红色长方形后面紧跟着一个红色三角形的组合)。
第一个数据流是要处理的数据源,流中的对象具有 Color 和 Shape 属性
// 使用 Color 作为键来分组,保证相同 Color 的数据进入到同一个子任务中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
第二个数据流是要广播的数据,流中的对象是匹配规则(Rules)
// MapState 中保存 (RuleName,Rule) ,在描述类中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// ruleStream 使用 MapStateDescriptor 作为参数广播,得到广播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
然后需要做的是,连接两个流并且指定两个连接后的处理逻辑(使用广播的 rules 解析流中匹配的数据组,并返回)
DataStream<Match> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// matching logic
}
)
stream.connect(BroadcastStream)
方法用来连接广播流和非广播流,BroadcastStream
作为参数,返回一个 BroadcastConnectedStream
对象。BroadcastConnectedStream
调用 process()
方法执行处理逻辑,需要指定一个逻辑实现类作为参数,而具体的需要实现的抽象类取决于非广播流的类型:
- 如果非广播流是 keyed stream,需要实现 KeyedBroadcastProcessFunction
- 如果非广播流是 non-keyed stream,需要实现 BroadcastProcessFunction
这里注意下
KeyedBroadcastProcessFunction<Color, Item, Rule, String>
各个参数含义:
Color,非广播流 keyed stream 的键的类型
Item,非广播流的对象类型
Rule,广播流的对象类型
String,返回结果的类型
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
这两个抽象函数有两个相同的需要实现的接口:
-
processBroadcastElement()
处理广播流中接收的数据元 -
processElement()
处理非广播流数据的方法
用于处理非广播流是 non-keyed stream 的情况
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
用于处理非广播流是 keyed stream 的情况
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
可以看到这两个接口提供的上下文对象有所不同。非广播方(processElement
)使用 ReadOnlyContext
,而广播方(processBroadcastElement
)使用 Context
。
这两个上下文对象(简称ctx)通用的方法接口:
- 访问 Broadcast state:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 查询数据元的时间戳:
ctx.timestamp()
- 获取当前水印:
ctx.currentWatermark()
- 获取当前处理时间:
ctx.currentProcessingTime()
- 向旁路输出(side-outputs)发送数据:
ctx.output(OutputTag<X> outputTag, X value)
这两者不同之处在于对 Broadcast state 的访问限制:广播方对其具有读和写的权限(read-write),非广播方只有读的权限(read-only)
这么设计的原因是,保证 Broadcast state 在算子的所有并行实例中是相同的。由于 Flink 中没有跨任务的通信机制,在一个任务实例中的修改不能在并行任务间传递。而广播端在所有并行任务中都能看到相同的数据元,只对广播端提供可写的权限。
同时要求在广播端的每个并行任务中,对接收数据的处理是相同的。如果忽略此规则会破坏 State 的一致性保证,从而导致不一致且难以诊断的结果。也就是说,
processBroadcast()
的实现逻辑必须在所有并行实例中具有相同的确定性行为。
回到上一个例子,KeyedBroadcastProcessFunction
的实现可能看起来如下:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry:
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考虑因素
使用 Broadcast state 时要注意的是:
- 没有跨任务的通信,这就是为什么只有广播方可以修改 Broadcast state 的原因。
- 用户必须确保所有任务以相同的方式为每个传入的数据元更新 Broadcast state。否则,可能导致不一致的结果。
- 跨任务的 Broadcast state 中的事件顺序可能不同,尽管广播的元素可以保证所有元素都将转到所有下游任务,但元素可能以不同的顺序到达。因此,Broadcast state 更新不能依赖传入事件的顺序。
- 所有任务都会把 Broadcast state 存入 checkpoint,而不仅仅是其中一,虽然 checkpoint 发生时所有任务在具有相同的 broadcast state。这是为了避免在恢复期间所有任务从同一文件中进行恢复(避免热点),然而代价是做 state checkpoint 的大小增加了并行度数量的倍数。
- Flink 确保在恢复或改变并行度时不会有重复数据,也不会丢失数据。在具有相同或更小并行度的恢复的情况下,每个任务读取其状态检查点。在并行度放大时,每个任务都会读取自己的状态,多余的任务以循环方式读取前面任务的检查点。
- 不支持 RocksDB state backend,broadcast state 在运行时保存在内存中。
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
https://flink.xskoo.com/dev/stream/state/state.html