前言
抱歉起这种烂大街的日本轻小说风格标题来吸引注意力。原本我认为这是常识,不需要专门写一篇文章来讲解如此细碎的点。但是在最近工作巡检中发现了越来越多如同ValueState<Map>
的状态用法(当然大部分是历史遗留),部分Flink作业深受性能问题困扰,所以还是抽出点时间快速聊一聊,顺便给出不算优雅但还算有效的挽救方案。
基于RocksDB的状态序列化
我们已经知道,RocksDB是基于二进制流的内嵌K-V存储,所以Flink任务使用RocksDB状态后端时,写/读操作的状态数据都需要经过序列化和反序列化,从而利用TaskManager本地磁盘实现海量的状态存储。
举个栗子,RocksDBValueState的取值和更新方法如下:
class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
implements InternalValueState<K, N, V> {
@Override
public TypeSerializer<K> getKeySerializer() {
return backend.getKeySerializer();
}
@Override
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
@Override
public TypeSerializer<V> getValueSerializer() {
return valueSerializer;
}
@Override
public V value() {
try {
byte[] valueBytes =
backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
if (valueBytes == null) {
return getDefaultValue();
}
dataInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
}
}
@Override
public void update(V value) {
if (value == null) {
clear();
return;
}
try {
backend.db.put(
columnFamily,
writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
}
}
可见key和value都需要经过对应类型的TypeSerializer
的处理,即如果将状态声明为ValueState<Map<K, V>>
,那么将由MapSerializer<K, V>
负责值的正反序列化。特别注意,serializeCurrentKeyWithGroupAndNamespace()
方法中,key需要加上它所对应的KeyGroup编号和对应的Namespace(Namespace是窗口信息),形成一个复合key,即:CompositeKey(KG, K, NS)
,RocksDB实际存储的状态数据的key都类似如此。具体可参看SerializedCompositeKeyBuilder
类,不再赘述。
接下来再看一下RocksDBMapState的部分实现。
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
@Override
public TypeSerializer<K> getKeySerializer() {
return backend.getKeySerializer();
}
@Override
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
@Override
public TypeSerializer<Map<UK, UV>> getValueSerializer() {
return valueSerializer;
}
@Override
public UV get(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null
? null
: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
}
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
@Override
public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
if (map == null) {
return;
}
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(
backend.db, writeOptions, backend.getWriteBatchSize())) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
entry.getKey(), userKeySerializer);
byte[] rawValueBytes =
serializeValueNullSensitive(entry.getValue(), userValueSerializer);
writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
}
}
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
return this::iterator;
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(
backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
public Map.Entry<UK, UV> next() {
return nextEntry();
}
};
}
由于MapState的本身有用户定义的key UK
,所以RocksDB存储它时,会在上文所述的复合key后面,再加上UK
的值,即:CompositeKey(KG, K, NS) :: UK
。这样,同属于一个KeyContext的所有用户键值对就存在一个连续的存储空间内,可以通过RocksDB WriteBatch机制攒批,实现批量写(putAll()
方法),也可以通过RocksDB Iterator机制做前缀扫描,实现批量读(entries()
方法)。
问题的症结
代码读完了。假设我们在某个key下有5条数据的状态,若使用ValueState<Map<String, String>>
来存储,按照MapSerializer
的序列化方式,其存储可以记为:
(1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]
注意对于无窗口上下文的状态,NS为VoidNamespace
。且序列化Map时,会加上Map的大小,以及表示每个value是否为NULL的标记。
如果使用MapState<String, String>
存储,可以记为:
(1, k, VoidNamespace) :: k1 -> v1
(1, k, VoidNamespace) :: k2 -> v2
(1, k, VoidNamespace) :: k3 -> NULL
(1, k, VoidNamespace) :: k4 -> v4
(1, k, VoidNamespace) :: k5 -> v5
如果我们获取或修改一条状态数据,前者需要将所有数据做一遍序列化和反序列化,而后者只需要处理一条。在Map比较小的情况下可能没有明显的性能差异,但是如果Map有几十个甚至上百个键值对,或者某些value的长度很长(如各类打标标记串等),ValueState<Map>
的性能退化就会非常严重,造成反压。
有的同学可能会问:我对状态数据的操作基本都是“整存整取”(即读/写整个Map),也不建议使用ValueState<Map>
吗?答案仍然是不建议。除了前面提到的WriteBatch和Iterator为MapState
带来的优化之外,RocksDB更可以利用多线程进行读写,而单个大value不仅不能享受这个便利,还会挤占Block Cache空间,在出现数据倾斜等场景时,磁盘I/O可能会打到瓶颈。所以,我们在开始编写作业时就应该正确使用MapState
。
平滑迁移
为了消除此类状态误用的影响,常见的重构方式是将ValueState<Map>
修改为MapState
,重置位点后消费历史数据,积攒状态,并替换掉旧任务。但是对于状态TTL较长、size较大的场景(例如物流监控场景经常有30天TTL、十几TB大的State),这样显然非常不方便,下面提供一种简单的平滑迁移方式。
假设原本误用的状态为mainState
,我们声明两个新的状态,一个是新的MapState newMainState
,一个是布尔型ValueState isMigratedState
,表示该key对应的状态是否已经迁移成了新的,即:
private transient ValueState<Map<String, String>> mainState;
private transient ValueState<Boolean> isMigratedState;
private transient MapState<String, String> newMainState;
当然,它们的TTL等参数要完全相同。
写两个新的方法,负责在读写mainState
时将其迁移成newMainState
,并做上相应的标记。不存在历史状态的,直接以新格式存储。再强调一遍,newMainState.entries()
和newMainState.putAll()
的性能很不错,不必过于担心。
private Map<String, String> wrapGetMainState() throws Exception {
Boolean isMigrated = isMigratedState.value();
if (isMigrated == null || !isMigrated) {
Map<String, String> oldStateData = mainState.value();
if (oldStateData != null) {
newMainState.putAll(mainState.value());
}
isMigratedState.update(true);
mainState.clear();
}
Map<String, String> result = new HashMap<>();
for (Entry<String, String> e : newMainState.entries()) {
result.put(e.getKey(), e.getValue());
}
return result;
}
private void wrapUpdateMainState(Map<String, String> data) throws Exception {
Boolean isMigrated = isMigratedState.value();
if (isMigrated == null || !isMigrated) {
Map<String, String> oldStateData = mainState.value();
if (oldStateData != null) {
newMainState.putAll(mainState.value());
}
isMigratedState.update(true);
mainState.clear();
}
newMainState.putAll(data);
}
再将历史代码中的状态访问全部替换成wrapGetMainState()
和wrapUpdateMainState()
方法的调用即可。表面上看是由一个状态句柄变成了两个状态句柄,但是标记状态的访问十分轻量级,且随着程序的运行,旧状态的数据渐进式地替换完毕之后,就可以安全地删除mainState
和isMigratedState
了。当然,托管内存的设置要科学,并添加一些有利于RocksDB状态吞吐量的参数,如:
state.backend.rocksdb.predefined-options SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.memory.partitioned-index-filters true
基于堆的状态呢?
与RocksDB相反,基于堆的JobManager和FileSystem状态后端无需序列化和反序列化,当然状态的大小就要受制于TaskManager内存。不过,如果我们采用这两种状态后端,ValueState<Map>
和MapState
也就没有明显的性能差别了,因为HeapValueState
和HeapMapState
的底层都是相同的,即CopyOnWriteStateTable
,本质上是内存中的状态映射表。读者有兴趣可以自行参考对应的Flink源码,这里不再啰嗦了。
ValueState、ListState、MapState三者在RocksDB状态后端和基于堆的状态后端中的异同点可以概括成下表。