有状态函数(Stateful function)和算子(Operator)在处理独立的数据或事件时存储数据,使得状态(State)成为任何复杂算子中的关键部分,例如:
- 当应用检索特定的事件模式,State 将存储到接收到的事件的序列。
- 当按照分钟/小时/天的唯独聚合事件时,状态将保留待处理的聚合状态。
- 当在流数据上进行机器学习的模型训练时,状态保存模型当前版本的参数。
- 当需要管理历史数据时,状态允许有效地访问过去发生的事件。
使用 Flink 需要了解 State,以便应用的检查点(checkpoint)容错和保存点(savepoint)可用。
State 的使用
Keyed State 和 Operator State
Flink 中的状态分为两种:Keyed state 和 Operator state
Operator State
每个 Operator state(non-keyed state) 都绑定到一个并行的算子实例上。可以参考 Kafka connector 的例子,Kafka 消费者的每个并行实例都维护一个 topic 分区和偏移(offset)的对应关系作为 Operator state。Operator state 支持当并行度更改时,在并行的算子实例间重新分配 State,进行这种重新分配有多种不同的方案(后面会介绍)。
Keyed State
Keyed state 与键(key)相关,只能在 KeyedStream 上应用的函数和算子内使用。可以将 Keyed state 认为是分区后的 Operator state,每个 key 有一个状态的分区(state-partition)。逻辑上,每个 Keyed state 绑定唯一的 <parallel-operator-instatnce, key>(算子并行实例和 key 的一对元组),可以将其简单地视为 <operator, key>(因为每个 key 属于算子的唯一一个并行实例)。
Keyed state 进一步被组织称为 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子单位,Key groups 的数量与最大并行度相同。在程序执行期间 keyed operator 的每个并行实例都使用一个或多个 Key groups 的 keys。
Raw State 和 Managed State
Keyed state 和 Operator state 有两种形式:托管状态(managed)和原始状态(raw)。
托管状态(managed state) 由 Flink runtime 管理的数据结构表示,例如内部哈希表或 RocksDB。Flink runtime 对 State 进行编码并写入 checkpoint。
原始状态(raw state) 是在算子内部的数据结构中的保存。Checkpoint 只会保存 State 内容的字节序列,State 的真实数据结构对 Flink 是透明的。
所有数据流函数都可以使用托管状态(managed state),而原始状态(raw state)只能在具体实现算子时使用。建议使用托管状态(而不是原始状态),因为在托管状态下,Flink 能够在并行度改变时自适应地重新分配 State,并且在内存管理方面可以做的更好。
如果要为托管状态自定义序列化的逻辑,请参考 自定义序列化 以确保将来的兼容性。默认序列化方法不需要特殊处理。
托管的 Keyed State
托管 Keys state 提供多种不同类型 State,作用域都是当前输入数据的键,只能用于 KeyedStream,可以通过 stream.keyBy(…) 创建。
首先看不同类型的状态,以及它们如何在程序中使用:
ValueState<T>:保存了一个值,可以更新和读取(算子操作的每个 key 可能有一个 value)
update(T)更新
T value()取值ListState<T>:保存了一个列表 List。可以追加元素,也可以获取到一个包含所有当前存储的元素的迭代器(Iterable)
add(T)或addAll(List<T>)添加到列表
Iterable<T> get()获取迭代器
update(List<T>)使用新的列表覆盖现有列表ReducingState<T>:保存一个值,表示添加到 State 的所有值的聚合结果。提供的接口类似于ListState
add(T)函数会使用指定的函数(ReduceFunction)对添加的值进行聚合AggregatingState<IN, OUT>:保存一个值,表示添加到 State 的所有值的聚合结果。与ReducingState不同的是,聚合结果的数据类型可以与添加到 State 的元素的数据类型不同。接口同样类似于ListState
add(IN)函数会使用指定的函数(AggregateFunction)对添加的值进行聚合FoldingState<T, ACC>:保存一个值,表示添加到 State 的所有值的聚合结果。与ReducingState不同的事,聚合结果的数据类型可以与添加到 State 的元素的数据类型不同。接口同样类似于ListState。已经过期的API。
add(IN)函数会使用指定的函数(FoldFunction)对添加的值进行聚合MapState<UK, UV>:保存一个映射表 Map。可以将 key/value 存入 State,也可以获取到一个包含所有当前存储的元素的迭代器(Iterable)
put(UK, UV)或putAll(Map<UK, UV>)添加 key/value 到 Map
get(UK)获取与指定 key 的 value
entries()、keys()和values()对 Map 的元素/键/值遍历访问
所有类型的都有
clear()方法来清除当前状态。FoldingState和FoldingStateDescriptor已在 Flink 1.4 中弃用,未来版本将被完全删除。可以使用AggregatingState和AggregatingStateDescriptor代替。
首先,这些对象仅用于与 State 交互,State 不一定存储在内存,也可能存储在磁盘或其他位置。第二点,从 State 获得的值取决于输入数据的的 key,如果处理的 keys 不同,定义的函数的调用结果会不同。
要向操作 State 对象句柄,首先必须创建一个 StateDescriptor,该对象拥有 State 名称(可以创建多个 State,必须具有唯一的名称来引用 State),State 所持有的值的类型,可能还有用户指定的函数(例如 ReduceFunction)。对应不同的 State 类型,有如下类对象:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor 和 MapStateDescriptor。然后使用 RuntimeContext 可以才访问到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 访问不通类型 State 的方法:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面是在 FlatMapFunction 中的使用示例:
实现了一个简单的计数窗口,通过输入元组的第一个参数分组,在分组的流中,每接收到两个元组,返回两那个元组的第二个参数的平均值
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
// 使用 ValueState 保存元素求和值
// 元组第一个参数为求和个数 count,第二个参数为求和值 sum
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// 访问 State 的值
val tmpCurrentSum = sum.value
// 初始值 (0, 0)
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// 求和并更新 State
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
sum.update(newSum)
// 如果 state value 达到 2, 发送统计的平均值并清空 state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
// 通过 RuntimeContext 和 ValueStateDescriptor 获取 ValueState
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create keyed stream and call CountWindowAverage
env.fromCollection(List(
(1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
))
.keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// 输出 (1,4) (1,5)
env.execute("ExampleManagedState")
}
状态生存周期 (TTL)
生存期(TTL)可以被指定给任何类型的 Keyed state,如果配置了TTL并且 State value 已过期,State value 会被清除。所有集合类型 State (ListState 和 MapState)都支持为每个条目设置TTL。为了启用 State TTL,首先需要构建 StateTtlConfig 对象,然后通过在 ValueStateDescriptor(其他类型同理)构造中传入该对象来启用TTL,参考下面的例子:
// Time.seconds(1) 生存时间
//
// StateTtlConfig.UpdateType 更新类型
// - UpdateType.OnCreateAndWrite - 创建和写入时更新(默认)
// - UpdateType.OnReadAndWrite - 读取和写入时更新
//
// StateTtlConfig.StateVisibility 状态可见性,访问时是否返回已经过期的值
// - StateVisibility.NeverReturnExpired - 永远不会返回过期的值(默认)
// - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以读到会返回
//
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
// 构建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 启用TTL
stateDescriptor.enableTimeToLive(ttlConfig)
补充:
- State backends 存储最新一次修改的时间戳以及更新值,因此启用TTL会增加存储的消耗。
- 目前TTL仅支持处理时间(processing time)。
- 如果之前没有配置TTL,而状态恢复时启用TTL(相反的情况同样),会引起兼容性错误和 StateMigrationException 异常。
- TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 运行时做处理。
- 对于 Map 类型,只有序列化方法支持空值时,TTL的设置才支持空值,否则需要使用
NullableSerializer进行封装(序列化会占用额外的字节)。
过期数据清理
目前过期值只有在显式访问时才会被删除(如调用 ValueState.value())。如果未读取过期状态的数据,则不会将其删除,这可能会导致状态不断增长。将来的版本中可能会发生变化。
此外,可以指定在获取完整的 State 快照时激活清理方法,以减小快照的大小。当从上一个快照恢复时,不会包括已删除的过期 State。StateTtlConfig 可以指定(不适用于 RocksDB 增量 checkpoint 的情况):
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
Scala 中的快捷接口
除了上述接口外,Scala API 可以在 KeyedStream 上的有状态 map() 或 flatMap() 函数中使用 ValueState。如下获取 ValueState 当前值,并必须返回用于更新的新值:
val stream: DataStream[(String, Int)] = ...
// 调用 mapWithState
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
托管的 Operator State
Operator state 的数据结构不像 Keyed state 丰富,只支持 List,可以认为是可序列化对象的列表,彼此独立。这些对象在动态扩展时是可以重新分配 non-keyed state 的最小单元。目前支持几种动态扩展方式:
Even-split redistribution:算子并发度发生改变的时候,并发的每个实例取出 State 列表,合并到一个新的列表上,形成逻辑上完整的 State。然后根据列表元素的个数,均匀分配给新的并发实例(Task)。
例如,如果并行度为1,算子的 State checkpoint 包含数据元 element1 和 element2,当并行度增加到2时,element1 会在 算子实例0中,而element2在算子实例1中。Union redistribution:相比于平均分配更加灵活,把完整 State 划分的方式交给用户去做。并发度发生改变的时候,按同样的方式取到完整的 State 列表,然后直接交给每个实例。
使用托管的 Operator State,有状态函数需要实现 CheckpointedFunction 接口(更通用的)或 ListCheckpointed<T extends Serializable>接口。
CheckpointedFunction
CheckpointedFunction 需要实现两个方法:
// 执行 Checkpoint 时调用
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化函数时调用
void initializeState(FunctionInitializationContext context) throws Exception;
initializeState()函数不止在首次初始化函数调用,从 checkpoint 恢复时同样会调用。因此,不仅要初始化 State 的逻辑,还包括 State 恢复的逻辑。
下面的例子是有状态的 SinkFunction,利用 CheckpointedFunction 在将数据元发送之前进行缓存,checkpoint 时缓存写入 State,启动时判断是否使用 重发的 State 恢复缓存:
class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
// 输入数据缓存
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
// 输入数据存入 buffer
bufferedElements += value
// 当缓存数量达到阈值,发送数据,并清理 buffer
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
// 执行 Checkpoint 时调用
// 清除前一个检查点包含的所有对象
// 缓存数据添加到 State
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
// 用于首次初始化或从 checkpoint 恢复
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
// 使用 ctx 和 ListStateDescriptor 访问 ListState
// 注意调用方法 getListState(descriptor)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
// 根据 isRestored() 方法来检查当前是否是 checkpoint 恢复的情况
if(context.isRestored) {
// 如果 true,表示是恢复失败的情况,应用恢复数据的逻辑:
// 恢复数据添加到 buffer 中
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
获取 State 句柄函数的命名中约定包含其重新分发模式,其次时状态结构。上面的例子中调用方法 getListState 表示使用了平均分发(even-split)的方式。如果要使用 union 的方式,使用 getUnionListState(descriptor) 方法返回的 ListState,会包含完整的 State 列表,由用户决定处理恢复哪些数据。
ListCheckpointed
ListCheckpointed 是 CheckpointedFunction 的一个有限变体,只支持平均分发(even-split)的情况,同样需要实现两个方法:
// 执行 Checkpoint 时调用
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
// 初始化函数时调用
void restoreState(List<T> state) throws Exception;
有状态的 Source Functions
在 Source 方法中,更新状态和输出应该在一个原子操作下完成(为了满足精确一次的语义下的故障恢复),因此用户需要取一个锁,可以从ctx中获得。
class CounterSource extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
// 获取锁
val lock = ctx.getCheckpointLock
while (isRunning) {
// 输出和更新在一个原子操作中
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}
Broadcast State
前面提到了两种 Operator state 支持的动态扩展方法:even-split(算子的每个平行任务平均的恢复状态的一部分)和 union(每个任务获得完整的状态恢复)。Broadcast State 是 Flink 支持的另一种扩展方式。用来支持将某一个流的数据广播到所有下游任务,数据被存储在本地,接受到广播的流在操作时可以使用这些数据。
Broadcast state 的特点是:
- 使用 Map 类型的数据结构
- 仅适用于同时具有广播流和非广播流作为数据输入的特定算子
- 可以具有多个不同名称的 Broadcast state
Broadcast State API
下面是一个例子来展示 Broadcast State API 的使用,在这个示例中,要处理的数据流中是有不同颜色(Color)和形状(Shape)属性的对象,希望在流中找到一对具有相同颜色的,并且遵循一个特定的形状模式的对象组(例如,一个红色长方形后面紧跟着一个红色三角形的组合)。
第一个数据流是要处理的数据源,流中的对象具有 Color 和 Shape 属性
// 使用 Color 作为键来分组,保证相同 Color 的数据进入到同一个子任务中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
第二个数据流是要广播的数据,流中的对象是匹配规则(Rules)
// MapState 中保存 (RuleName,Rule) ,在描述类中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// ruleStream 使用 MapStateDescriptor 作为参数广播,得到广播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
然后需要做的是,连接两个流并且指定两个连接后的处理逻辑(使用广播的 rules 解析流中匹配的数据组,并返回)
DataStream<Match> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// matching logic
}
)
stream.connect(BroadcastStream) 方法用来连接广播流和非广播流,BroadcastStream 作为参数,返回一个 BroadcastConnectedStream 对象。BroadcastConnectedStream 调用 process() 方法执行处理逻辑,需要指定一个逻辑实现类作为参数,而具体的需要实现的抽象类取决于非广播流的类型:
- 如果非广播流是 keyed stream,需要实现 KeyedBroadcastProcessFunction
- 如果非广播流是 non-keyed stream,需要实现 BroadcastProcessFunction
这里注意下
KeyedBroadcastProcessFunction<Color, Item, Rule, String>各个参数含义:
Color,非广播流 keyed stream 的键的类型
Item,非广播流的对象类型
Rule,广播流的对象类型
String,返回结果的类型
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
这两个抽象函数有两个相同的需要实现的接口:
-
processBroadcastElement()处理广播流中接收的数据元 -
processElement()处理非广播流数据的方法
用于处理非广播流是 non-keyed stream 的情况
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
用于处理非广播流是 keyed stream 的情况
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
可以看到这两个接口提供的上下文对象有所不同。非广播方(processElement)使用 ReadOnlyContext,而广播方(processBroadcastElement)使用 Context。
这两个上下文对象(简称ctx)通用的方法接口:
- 访问 Broadcast state:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) - 查询数据元的时间戳:
ctx.timestamp() - 获取当前水印:
ctx.currentWatermark() - 获取当前处理时间:
ctx.currentProcessingTime() - 向旁路输出(side-outputs)发送数据:
ctx.output(OutputTag<X> outputTag, X value)
这两者不同之处在于对 Broadcast state 的访问限制:广播方对其具有读和写的权限(read-write),非广播方只有读的权限(read-only)
这么设计的原因是,保证 Broadcast state 在算子的所有并行实例中是相同的。由于 Flink 中没有跨任务的通信机制,在一个任务实例中的修改不能在并行任务间传递。而广播端在所有并行任务中都能看到相同的数据元,只对广播端提供可写的权限。
同时要求在广播端的每个并行任务中,对接收数据的处理是相同的。如果忽略此规则会破坏 State 的一致性保证,从而导致不一致且难以诊断的结果。也就是说,
processBroadcast()的实现逻辑必须在所有并行实例中具有相同的确定性行为。
回到上一个例子,KeyedBroadcastProcessFunction 的实现可能看起来如下:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry:
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考虑因素
使用 Broadcast state 时要注意的是:
- 没有跨任务的通信,这就是为什么只有广播方可以修改 Broadcast state 的原因。
- 用户必须确保所有任务以相同的方式为每个传入的数据元更新 Broadcast state。否则,可能导致不一致的结果。
- 跨任务的 Broadcast state 中的事件顺序可能不同,尽管广播的元素可以保证所有元素都将转到所有下游任务,但元素可能以不同的顺序到达。因此,Broadcast state 更新不能依赖传入事件的顺序。
- 所有任务都会把 Broadcast state 存入 checkpoint,而不仅仅是其中一,虽然 checkpoint 发生时所有任务在具有相同的 broadcast state。这是为了避免在恢复期间所有任务从同一文件中进行恢复(避免热点),然而代价是做 state checkpoint 的大小增加了并行度数量的倍数。
- Flink 确保在恢复或改变并行度时不会有重复数据,也不会丢失数据。在具有相同或更小并行度的恢复的情况下,每个任务读取其状态检查点。在并行度放大时,每个任务都会读取自己的状态,多余的任务以循环方式读取前面任务的检查点。
- 不支持 RocksDB state backend,broadcast state 在运行时保存在内存中。
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
https://flink.xskoo.com/dev/stream/state/state.html