又见Rescale
笔者在很久之前的一篇文章(传送门)中讲解过Flink的状态缩放(Rescale)和键组(Key Group)设计,相信各位看官对下面这张图已经很熟悉了。
简言之,Flink通过引入Key Group,将状态Rescale时从远端DFS恢复数据的操作从随机读尽量优化成顺序读,I/O瓶颈大大减轻。
但是,当Flink应用的Sub-task和状态Key非常多时,改变有状态算子的并行度仍然可能要花费较长时间恢复。例如我们负责的比较大的一个Flink任务(版本1.14),状态Key总数接近10亿,并行度从240扩容到480,在TaskManager资源充足且HDFS吞吐无瓶颈的情况下,状态数据完全恢复也需要20+分钟时间。除了数据量大之外,还有一个关键原因是:并行度增加后,新的Sub-task拿到原来的状态数据,需要将不属于自己的Key Group裁剪掉,否则会与其他Sub-task冲突。以上图为例,扩容后的Sub-task 1需要将扩容前Sub-task 0和1的状态文件都恢复到RocksDB,并且裁剪掉KG-1
、KG-2
、KG-5
、KG-6
的数据,只保留KG-3
和KG-4
。而我们知道,RocksDB的删除操作是产生Tombstone记录,本质上与写入无异,所以这种情况下TaskManager本地磁盘仍然有较大的I/O压力。
不过,上述问题在Flink 1.16有了一定改善,因为RocksDB状态后端运用了RocksDB原生的DeleteRange
API来快速删除指定区间内的Keys,在我们的实测中,大状态任务恢复速度最多可以提升60%。下面讨论DeleteRange
的实现原理。
RocksDB原生DeleteRange原理
在没有DeleteRange
API的时候,区间删除只能采用传统的迭代器遍历操作:
Slice start, end;
auto it = db->NewIterator(ReadOptions());
for (it->Seek(start); cmp->Compare(it->key(), end) < 0; it->Next()) {
db->Delete(WriteOptions(), it->key());
}
有了DeleteRange
API,就简单很多:
Slice start, end;
db->DeleteRange(WriteOptions(), start, end);
为了实现区间删除,RocksDB在原始的MemTable(称为Point-key MemTable)之外,又新增了Range Tombstone MemTable,专门缓存区间删除的数据。同理,在SST文件中也对应新增了包含区间删除信息的元数据块Range Tombstone Block(Seqnum为写入序列号)。如下两图所示。
由此可见,如果我们要删除包含10000个连续Key的集合,传统方式会产生10000个Tombstone,而DeleteRange
方式只会产生1个Range Tombstone,能够有效降低读写放大。
在写入过程中,Range Tombstone也需要参与Compaction流程,以及时删除无效Tombstone。此处细节很多,简单概括来讲:
- 在Compaction开始时,收集所有源SST文件的Range Tombstone区间,形成一个包含所有区间删除Key的最小堆。
- 对于每个输入Key,判断它是Merge类型还是Put类型:Merge操作则将该Key的所有历史版本合并,如果历史版本没有被Snapshot引用,则可以删除对应的Tombstone;Put操作说明该Key是新写入数据,所有Tombstone都可以被清理掉。
- 清理完成后,将剩余的有效Tombstone重新写回新SST文件的Range Tombstone Block。
引入Range Tombstone后,RocksDB读取操作面临一个新问题:如何快速判断要读取的Key是否位于某个已经标记删除的区间中?答案是分段(RocksDB内部称为"Fragmentation"),本质上与天际线问题(The Skyline Problem)的解法相同,见Leetcode 218。
如上图所示,在RocksDB的语义下,X轴表示Key,Y轴表示Seqnum。在打开一个SST文件时,RocksDB会扫描该文件中所有的Range Tombstone区间(图A中不同颜色的色块),并将它们整合成互不重叠的子区间。将这些子区间按照左值升序排序并缓存下来(图B),就可以根据Key进行高效的二分查找了。
Flink对DeleteRange的运用
回到Flink,以1.16版本为例,当任务发生Rescale并从状态恢复到RocksDB时,实际上是调用RocksDBIncrementalRestoreOperation#restoreWithRescaling()
方法:
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles)
throws Exception {
// Prepare for restore with rescaling
KeyedStateHandle initialHandle =
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
restoreStateHandles, keyGroupRange, overlapFractionThreshold);
// Init base DB instance
if (initialHandle != null) {
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
this.rocksHandle.openDB();
}
// ..................................
}
其中,RocksDBIncrementalCheckpointUtils#chooseTheBestStateHandleForInitial()
方法负责从所有要恢复的状态句柄中,尽量选择出与当前Sub-task负责的KeyGroupRange
重合比例最高的一个,用来初始化本地RocksDB实例,以尽量降低后续裁剪的压力。
接下来通过initDBWithRescaling()
方法调用RocksDBIncrementalCheckpointUtils#clipDBWithKeyGroupRange()
方法,按照KeyGroupRange
的范围进行裁剪。从文章开头的图示可知,由于Key Group已经是有序的,因此在扩容的情况下,新Sub-task不再负责的Key Group一定位于头尾,因此只需要比较两者的startKeyGroup
和endKeyGroup
即可。
public static void clipDBWithKeyGroupRange(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull KeyGroupRange targetKeyGroupRange,
@Nonnull KeyGroupRange currentKeyGroupRange,
@Nonnegative int keyGroupPrefixBytes)
throws RocksDBException {
final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
}
if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
}
}
而deleteRange()
方法就是代理了RocksDB JNI的同名方法,进行高效的区间删除。
private static void deleteRange(
RocksDB db,
List<ColumnFamilyHandle> columnFamilyHandles,
byte[] beginKeyBytes,
byte[] endKeyBytes)
throws RocksDBException {
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
// Using RocksDB's deleteRange will take advantage of delete
// tombstones, which mark the range as deleted.
//
// https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
}
}
读者也可以手动翻一下Flink 1.14或更早版本的源码, deleteRange()
方法是通过遍历Key进行前缀比较,并执行WriteBatch
操作批量删除不符合条件的Key,相比原生DeleteRange
的效率要低很多。