本文档解释了在开发应用程序时如何使用Flink的状态抽象。
Keyed State and Operator State
在Flink中有两种基本的状态:Keyed State and Operator State.
Keyed State
Keyed State总是相对于键,只能用于KeyedStream上的函数和操作符。
您可以将键控状态看作已分区或分片的操作符状态,每个键只有一个状态分区。每个键状态在逻辑上都绑定到一个唯一的组合:<parallel-operator-instance,key >,由于每个键都恰好属于一个键操作符的一个并行实例,我们可以把它简单地看作是<operator,key >。
Keyed State被进一步组织为所谓的Key Groups。Key Groups是Flink重新分配键控状态的原子单位;Key Groups的数量与定义的最大并行度完全相同。在执行期间,键操作符的每个并行实例与一个或多个键组的键一起工作。
Operator State
使用Operator State(或非键控状态),每个运算符状态都绑定到一个并行运算符实例。 Kafka Connector是在Flink中使用操作员状态的一个很好的例子。 Kafka使用者的每个并行实例都维护一个主题分区和偏移量的映射作为其操作员状态。
当并行性发生变化时,操作符状态接口支持在并行操作符实例之间重新分布状态。可以有不同的方案来进行这种重新分发。
Raw and Managed State
Keyed State和Operator State有两种形式: managed和raw。
Managed State(托管状态) 用Flink运行时控制的数据结构表示,比如内部哈希表或RocksDB。例如ValueState、ListState等。Flink的运行时编码状态并将其写入检查点。
Raw State(原始状态)是操作符保存在自己数据结构中的状态。当检查点指向时,它们只将一个字节序列写入检查点。Flink不知道状态的数据结构,只看到原始字节。
所有datastream函数都可以使用托管状态,但是原始状态接口只能在实现操作符时使用。建议使用托管状态(而不是原始状态),因为使用托管状态时,Flink能够在并行性发生改变时自动重新分发状态,而且还可以更好地进行内存管理。
注意 如果托管状态需要自定义序列化逻辑,请参阅相应的指南,以确保未来的兼容性。Flink的默认序列化器不需要特殊处理。
Using Managed Keyed State
托管键控状态接口提供了对不同类型的状态的访问,这些状态的作用域都与当前输入元素的键有关。这意味着这种类型的状态只能用于KeyedStream,而KeyedStream可以通过stream.keyBy()创建。
现在,我们将首先查看可用状态的不同类型,然后看看如何在程序中使用它们。可用的状态原语是:
- ValueState<T>: 这保留了一个可以更新和检索的值(如上所述,作用域为输入元素的键,因此操作看到的每个键可能都有一个值)。可以使用update(T)设置该值,并使用T value()检索该值。
- ListState<T>: 它保存了一个元素列表。您可以添加元素并在所有当前存储的元素上检索一个可迭代。使用add(T)或addAll(List<T>)添加元素,Iterable可以使用Iterable<T> get()检索。您还可以使用update覆盖现有的列表(list <T>)
- ReducingState<T>: 这将保留一个单独的值,该值表示添加到状态的所有值的聚合。该接口与ListState类似,但是使用add(T)添加的元素会使用指定的ReduceFunction将其减少为聚合
- AggregatingState<IN, OUT>: 这将保留一个单独的值,该值表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态中的元素的类型不同。该接口与ListState相同,但是使用add(IN)添加的元素是使用指定的AggregateFunction聚合的。
-
FoldingState<T, ACC>: 这将保留一个单独的值,该值表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态中的元素的类型不同。该接口与ListState类似,但是使用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合。 - MapState<UK, UV>: 它保存了一个映射列表。您可以将键-值对放入状态,并在所有当前存储的映射上检索一个迭代。使用put(UK, UV)或putAll(Map<UK, UV>)添加映射。可以使用get(UK)检索与用户密钥关联的值。可以分别使用entries()、keys()和values()检索映射、键和值的可迭代视图。还可以使用isEmpty()检查这个映射是否包含任何键-值映射。
所有类型的状态都有一个clear()方法,该方法清除当前活动键(即输入元素的键)的状态。
注意 FoldingState和FoldingStateDescriptor已经在Flink 1.4中被弃用,将来会被完全删除。请使用AggregatingState和AggregatingStateDescriptor代替。
务必记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他地方。要记住的第二件事是,从状态中获得的值依赖于输入元素的键。因此,如果涉及的键不同,在用户函数的一次调用中获得的值可能与另一次调用中的值不同。
要获得状态句柄,您必须创建一个StateDescriptor。持有状态的名称(稍后我们将看到,您可以创建几个状态,他们必须有唯一的名称,这样您就可以引用它们),国家持有的值的类型,可能还有一个指定的函数,比如ReduceFunction。根据要检索的状态类型,可以创建ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor或MapStateDescriptor。
状态是使用RuntimeContext访问的,所以只有在富函数中才有可能。请在 这里查看相关信息,不过我们很快还会看到一个示例。在RichFunction中可用的RuntimeContext具有以下访问状态的方法
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
这是一个FlatMapFunction示例,展示了如何将所有的部件组合在一起
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
这个例子实现了一个穷人的计数窗口。我们根据第一个字段为元组设置键值(在这个例子中,所有的键值都是1),函数将计数和运行求和存储在ValueState中。一旦计数达到2,它将释放平均值并清除状态以便我们从0开始。注意,如果我们在第一个字段中有具有不同值的元组,那么这将为每个不同的输入键保留不同的状态值。
State Time-To-Live (TTL)
可以为任意类型的键控状态分配一个生存时间(TTL)。如果配置了TTL并且状态值已经过期,则将尽可能地清除存储的值,下面将对此进行更详细的讨论。
所有状态集合类型都支持每个入口ttl。这意味着列表元素和映射项独立过期。
为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
配置有几个选项需要考虑
newBuilder方法的第一个参数是强制的,它是生存时间值。
更新类型在状态TTL刷新时配置(默认OnCreateAndWrite)
- StateTtlConfig.UpdateType.OnCreateAndWrite - 只对创建和写访问
- StateTtlConfig.UpdateType.OnReadAndWrite - 还有读访问
状态可见性配置了过期的值是否在读访问时返回,如果它还没有被清除(默认情况下是NeverReturnExpired)。
- StateTtlConfig.StateVisibility.NeverReturnExpired - 过期的值永远不会返回
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果还可用就返回
在NeverReturnExpired的情况下,过期状态的行为就好像它不再存在一样,即使它仍然需要被删除。这个选项对于在TTL之后数据不可读的情况非常有用,例如应用程序处理隐私敏感的数据
另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。
注意:
- 状态后端存储最后一次修改的时间戳和用户值,这意味着启用此特性会增加状态存储的消耗。堆状态后端在内存中存储一个附加的Java对象,该对象带有对用户状态对象的引用和一个原语长值。RocksDB状态后端为每个存储的值、列表项或映射项添加8字节。
- 目前只支持与处理时间有关的TTLs。
- 尝试使用启用了TTL的描述符(或者相反)来恢复状态(以前在没有TTL的情况下配置)将导致兼容性失败和statmigrationexception。
- TTL配置不是检查或保存点的一部分,而是Flink在当前运行的作业中处理它的一种方式。
- 只有当用户值序列化器能够处理空值时,TTL的映射状态才支持空用户值。如果序列化器不支持空值,则可以用NullableSerializer对其进行包装,但要以额外的字节为代价以序列化的形式进行包装。
Cleanup of Expired State
默认情况下,过期的值会在读取时显式删除,比如ValueState#value,如果配置的状态后端支持,则会定期在后台收集垃圾。后台清理可以在StateTtlConfig中禁用
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground
.build