Flink RocksDB状态缩放加速:RocksDB原生DeleteRange原理简析

又见Rescale

笔者在很久之前的一篇文章(传送门)中讲解过Flink的状态缩放(Rescale)和键组(Key Group)设计,相信各位看官对下面这张图已经很熟悉了。

简言之,Flink通过引入Key Group,将状态Rescale时从远端DFS恢复数据的操作从随机读尽量优化成顺序读,I/O瓶颈大大减轻。

但是,当Flink应用的Sub-task和状态Key非常多时,改变有状态算子的并行度仍然可能要花费较长时间恢复。例如我们负责的比较大的一个Flink任务(版本1.14),状态Key总数接近10亿,并行度从240扩容到480,在TaskManager资源充足且HDFS吞吐无瓶颈的情况下,状态数据完全恢复也需要20+分钟时间。除了数据量大之外,还有一个关键原因是:并行度增加后,新的Sub-task拿到原来的状态数据,需要将不属于自己的Key Group裁剪掉,否则会与其他Sub-task冲突。以上图为例,扩容后的Sub-task 1需要将扩容前Sub-task 0和1的状态文件都恢复到RocksDB,并且裁剪掉KG-1KG-2KG-5KG-6的数据,只保留KG-3KG-4。而我们知道,RocksDB的删除操作是产生Tombstone记录,本质上与写入无异,所以这种情况下TaskManager本地磁盘仍然有较大的I/O压力。

不过,上述问题在Flink 1.16有了一定改善,因为RocksDB状态后端运用了RocksDB原生的DeleteRange API来快速删除指定区间内的Keys,在我们的实测中,大状态任务恢复速度最多可以提升60%。下面讨论DeleteRange的实现原理。

RocksDB原生DeleteRange原理

在没有DeleteRange API的时候,区间删除只能采用传统的迭代器遍历操作:

Slice start, end;
auto it = db->NewIterator(ReadOptions());
for (it->Seek(start); cmp->Compare(it->key(), end) < 0; it->Next()) {
  db->Delete(WriteOptions(), it->key());
}

有了DeleteRange API,就简单很多:

Slice start, end;
db->DeleteRange(WriteOptions(), start, end);

为了实现区间删除,RocksDB在原始的MemTable(称为Point-key MemTable)之外,又新增了Range Tombstone MemTable,专门缓存区间删除的数据。同理,在SST文件中也对应新增了包含区间删除信息的元数据块Range Tombstone Block(Seqnum为写入序列号)。如下两图所示。

由此可见,如果我们要删除包含10000个连续Key的集合,传统方式会产生10000个Tombstone,而DeleteRange方式只会产生1个Range Tombstone,能够有效降低读写放大。

在写入过程中,Range Tombstone也需要参与Compaction流程,以及时删除无效Tombstone。此处细节很多,简单概括来讲:

  • 在Compaction开始时,收集所有源SST文件的Range Tombstone区间,形成一个包含所有区间删除Key的最小堆。
  • 对于每个输入Key,判断它是Merge类型还是Put类型:Merge操作则将该Key的所有历史版本合并,如果历史版本没有被Snapshot引用,则可以删除对应的Tombstone;Put操作说明该Key是新写入数据,所有Tombstone都可以被清理掉。
  • 清理完成后,将剩余的有效Tombstone重新写回新SST文件的Range Tombstone Block。

引入Range Tombstone后,RocksDB读取操作面临一个新问题:如何快速判断要读取的Key是否位于某个已经标记删除的区间中?答案是分段(RocksDB内部称为"Fragmentation"),本质上与天际线问题(The Skyline Problem)的解法相同,见Leetcode 218

如上图所示,在RocksDB的语义下,X轴表示Key,Y轴表示Seqnum。在打开一个SST文件时,RocksDB会扫描该文件中所有的Range Tombstone区间(图A中不同颜色的色块),并将它们整合成互不重叠的子区间。将这些子区间按照左值升序排序并缓存下来(图B),就可以根据Key进行高效的二分查找了。

Flink对DeleteRange的运用

回到Flink,以1.16版本为例,当任务发生Rescale并从状态恢复到RocksDB时,实际上是调用RocksDBIncrementalRestoreOperation#restoreWithRescaling()方法:

    private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles)
            throws Exception {

        // Prepare for restore with rescaling
        KeyedStateHandle initialHandle =
                RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
                        restoreStateHandles, keyGroupRange, overlapFractionThreshold);

        // Init base DB instance
        if (initialHandle != null) {
            restoreStateHandles.remove(initialHandle);
            initDBWithRescaling(initialHandle);
        } else {
            this.rocksHandle.openDB();
        }
    // ..................................
    }

其中,RocksDBIncrementalCheckpointUtils#chooseTheBestStateHandleForInitial()方法负责从所有要恢复的状态句柄中,尽量选择出与当前Sub-task负责的KeyGroupRange重合比例最高的一个,用来初始化本地RocksDB实例,以尽量降低后续裁剪的压力。

接下来通过initDBWithRescaling()方法调用RocksDBIncrementalCheckpointUtils#clipDBWithKeyGroupRange()方法,按照KeyGroupRange的范围进行裁剪。从文章开头的图示可知,由于Key Group已经是有序的,因此在扩容的情况下,新Sub-task不再负责的Key Group一定位于头尾,因此只需要比较两者的startKeyGroupendKeyGroup即可。

    public static void clipDBWithKeyGroupRange(
            @Nonnull RocksDB db,
            @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
            @Nonnull KeyGroupRange targetKeyGroupRange,
            @Nonnull KeyGroupRange currentKeyGroupRange,
            @Nonnegative int keyGroupPrefixBytes)
            throws RocksDBException {

        final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
        final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];

        if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup(
                    currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup(
                    targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes);
            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }

        if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) {
            CompositeKeySerializationUtils.serializeKeyGroup(
                    targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes);
            CompositeKeySerializationUtils.serializeKeyGroup(
                    currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes);
            deleteRange(db, columnFamilyHandles, beginKeyGroupBytes, endKeyGroupBytes);
        }
    }

deleteRange()方法就是代理了RocksDB JNI的同名方法,进行高效的区间删除。

    private static void deleteRange(
            RocksDB db,
            List<ColumnFamilyHandle> columnFamilyHandles,
            byte[] beginKeyBytes,
            byte[] endKeyBytes)
            throws RocksDBException {

        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
            // Using RocksDB's deleteRange will take advantage of delete
            // tombstones, which mark the range as deleted.
            //
            // https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
            db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
        }
    }

读者也可以手动翻一下Flink 1.14或更早版本的源码, deleteRange()方法是通过遍历Key进行前缀比较,并执行WriteBatch操作批量删除不符合条件的Key,相比原生DeleteRange的效率要低很多。

The End

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容