此页面的目标是为需要使用自定义状态序列化的用户提供指导方针,介绍了如何提供自定义状态序列化器,以及实现允许状态模式演化的序列化器的指导方针和最佳实践。
如果您只是使用Flink自己的序列化器,那么这个页面是不相关的,可以忽略。
Using custom state serializers
当注册一个托管操作符或键控状态时,需要一个StateDescriptor来指定状态的名称,以及状态类型的信息。Flink的类型序列化框架使用类型信息为状态创建适当的序列化器。
也可以完全绕过这个问题,让Flink使用您自己的自定义序列化器来序列化托管状态,只需用您自己的类型序列化器实现直接实例化状态描述符即可
class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
val descriptor = new ListStateDescriptor[(String, Integer)](
"state-name",
new CustomTypeSerializer)
)
checkpointedState = getRuntimeContext.getListState(descriptor)
State serializers and schema evolution
本节解释与状态序列化和模式演化相关的面向用户的抽象,以及关于Flink如何与这些抽象交互的必要内部细节。
从保存点还原时,Flink允许更改用于读取和写入先前注册状态的序列化程序,这样用户就不会被锁定在任何特定的序列化模式中。 恢复状态后,将为该状态注册一个新的序列化程序(即,StateDescriptor附带的序列化程序用于访问恢复的作业中的状态)。 这个新的串行器可能具有与以前的串行器不同的架构。 因此,在实现状态序列化器时,除了读取/写入数据的基本逻辑外,还要牢记的另一件事是将来如何更改序列化架构。
当谈到模式时,在这个上下文中,这个术语在引用状态类型的数据模型和引用状态类型的序列化二进制格式之间是可以互换的。一般来说,模式在某些情况下会发生变化:
- 状态类型的数据模式已经发展,即从用作状态的POJO中添加或删除一个字段。
- 一般来说,在对数据模式进行更改之后,需要升级序列化器的序列化格式。
- 序列化器的配置已更改。
为了让新的执行具有状态的已写入模式的信息并检测模式是否已更改,在获取操作符s状态的保存点时,需要连同状态字节一起写入状态序列化器的快照。它抽象了一个TypeSerializerSnapshot,将在下一小节中进行解释。
The TypeSerializerSnapshot abstraction
public interface TypeSerializerSnapshot<T> {
int getCurrentVersion();
void writeSnapshot(DataOuputView out) throws IOException;
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
TypeSerializer<T> restoreSerializer();
}
public abstract class TypeSerializer<T> {
// ...
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}
序列化器类型erializersnapshot是一个时间点信息,作为状态序列化器编写模式的唯一真实来源,以及恢复与给定时间点相同的序列化器所必需的任何附加信息。在writeSnapshot和readSnapshot方法中定义了在还原时应该写入和读取什么作为序列化快照的逻辑。