关于使用Flink RocksDB状态后端时一定要写MapState而非ValueState<Map>这档事(以及解决方法)

前言

抱歉起这种烂大街的日本轻小说风格标题来吸引注意力。原本我认为这是常识,不需要专门写一篇文章来讲解如此细碎的点。但是在最近工作巡检中发现了越来越多如同ValueState<Map>的状态用法(当然大部分是历史遗留),部分Flink作业深受性能问题困扰,所以还是抽出点时间快速聊一聊,顺便给出不算优雅但还算有效的挽救方案。

基于RocksDB的状态序列化

我们已经知道,RocksDB是基于二进制流的内嵌K-V存储,所以Flink任务使用RocksDB状态后端时,写/读操作的状态数据都需要经过序列化和反序列化,从而利用TaskManager本地磁盘实现海量的状态存储。

举个栗子,RocksDBValueState的取值和更新方法如下:

class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
       implements InternalValueState<K, N, V> {
   @Override
   public TypeSerializer<K> getKeySerializer() {
       return backend.getKeySerializer();
   }

   @Override
   public TypeSerializer<N> getNamespaceSerializer() {
       return namespaceSerializer;
   }

   @Override
   public TypeSerializer<V> getValueSerializer() {
       return valueSerializer;
   }

   @Override
   public V value() {
       try {
           byte[] valueBytes =
                   backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());

           if (valueBytes == null) {
               return getDefaultValue();
           }
           dataInputView.setBuffer(valueBytes);
           return valueSerializer.deserialize(dataInputView);
       } catch (IOException | RocksDBException e) {
           throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
       }
   }

   @Override
   public void update(V value) {
       if (value == null) {
           clear();
           return;
       }

       try {
           backend.db.put(
                   columnFamily,
                   writeOptions,
                   serializeCurrentKeyWithGroupAndNamespace(),
                   serializeValue(value));
       } catch (Exception e) {
           throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
       }
   }
}

可见key和value都需要经过对应类型的TypeSerializer的处理,即如果将状态声明为ValueState<Map<K, V>>,那么将由MapSerializer<K, V>负责值的正反序列化。特别注意,serializeCurrentKeyWithGroupAndNamespace()方法中,key需要加上它所对应的KeyGroup编号和对应的Namespace(Namespace是窗口信息),形成一个复合key,即:CompositeKey(KG, K, NS),RocksDB实际存储的状态数据的key都类似如此。具体可参看SerializedCompositeKeyBuilder类,不再赘述。

接下来再看一下RocksDBMapState的部分实现。

class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
        implements InternalMapState<K, N, UK, UV> {
    @Override
    public TypeSerializer<K> getKeySerializer() {
        return backend.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return namespaceSerializer;
    }

    @Override
    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

        return (rawValueBytes == null
                ? null
                : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
    }

    @Override
    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);

        backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
    }

    @Override
    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }

        try (RocksDBWriteBatchWrapper writeBatchWrapper =
                new RocksDBWriteBatchWrapper(
                        backend.db, writeOptions, backend.getWriteBatchSize())) {
            for (Map.Entry<UK, UV> entry : map.entrySet()) {
                byte[] rawKeyBytes =
                        serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
                                entry.getKey(), userKeySerializer);
                byte[] rawValueBytes =
                        serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    @Override
    public Iterable<Map.Entry<UK, UV>> entries() {
        return this::iterator;
    }

    @Override
    public Iterator<Map.Entry<UK, UV>> iterator() {
        final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

        return new RocksDBMapIterator<Map.Entry<UK, UV>>(
                backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
            @Override
            public Map.Entry<UK, UV> next() {
                return nextEntry();
            }
        };
    }

由于MapState的本身有用户定义的key UK,所以RocksDB存储它时,会在上文所述的复合key后面,再加上UK的值,即:CompositeKey(KG, K, NS) :: UK。这样,同属于一个KeyContext的所有用户键值对就存在一个连续的存储空间内,可以通过RocksDB WriteBatch机制攒批,实现批量写(putAll()方法),也可以通过RocksDB Iterator机制做前缀扫描,实现批量读(entries()方法)。

问题的症结

代码读完了。假设我们在某个key下有5条数据的状态,若使用ValueState<Map<String, String>>来存储,按照MapSerializer的序列化方式,其存储可以记为:

(1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]

注意对于无窗口上下文的状态,NS为VoidNamespace。且序列化Map时,会加上Map的大小,以及表示每个value是否为NULL的标记。

如果使用MapState<String, String>存储,可以记为:

(1, k, VoidNamespace) :: k1 -> v1
(1, k, VoidNamespace) :: k2 -> v2
(1, k, VoidNamespace) :: k3 -> NULL
(1, k, VoidNamespace) :: k4 -> v4
(1, k, VoidNamespace) :: k5 -> v5

如果我们获取或修改一条状态数据,前者需要将所有数据做一遍序列化和反序列化,而后者只需要处理一条。在Map比较小的情况下可能没有明显的性能差异,但是如果Map有几十个甚至上百个键值对,或者某些value的长度很长(如各类打标标记串等),ValueState<Map>的性能退化就会非常严重,造成反压。

有的同学可能会问:我对状态数据的操作基本都是“整存整取”(即读/写整个Map),也不建议使用ValueState<Map>吗?答案仍然是不建议。除了前面提到的WriteBatch和Iterator为MapState带来的优化之外,RocksDB更可以利用多线程进行读写,而单个大value不仅不能享受这个便利,还会挤占Block Cache空间,在出现数据倾斜等场景时,磁盘I/O可能会打到瓶颈。所以,我们在开始编写作业时就应该正确使用MapState

平滑迁移

为了消除此类状态误用的影响,常见的重构方式是将ValueState<Map>修改为MapState,重置位点后消费历史数据,积攒状态,并替换掉旧任务。但是对于状态TTL较长、size较大的场景(例如物流监控场景经常有30天TTL、十几TB大的State),这样显然非常不方便,下面提供一种简单的平滑迁移方式。

假设原本误用的状态为mainState,我们声明两个新的状态,一个是新的MapState newMainState,一个是布尔型ValueState isMigratedState,表示该key对应的状态是否已经迁移成了新的,即:

    private transient ValueState<Map<String, String>> mainState;

    private transient ValueState<Boolean> isMigratedState;
    private transient MapState<String, String> newMainState;    

当然,它们的TTL等参数要完全相同。

写两个新的方法,负责在读写mainState时将其迁移成newMainState,并做上相应的标记。不存在历史状态的,直接以新格式存储。再强调一遍,newMainState.entries()newMainState.putAll()的性能很不错,不必过于担心。

    private Map<String, String> wrapGetMainState() throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        Map<String, String> result = new HashMap<>();
        for (Entry<String, String> e : newMainState.entries()) {
            result.put(e.getKey(), e.getValue());
        }
        return result;
    }

    private void wrapUpdateMainState(Map<String, String> data) throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        newMainState.putAll(data);
    }

再将历史代码中的状态访问全部替换成wrapGetMainState()wrapUpdateMainState()方法的调用即可。表面上看是由一个状态句柄变成了两个状态句柄,但是标记状态的访问十分轻量级,且随着程序的运行,旧状态的数据渐进式地替换完毕之后,就可以安全地删除mainStateisMigratedState了。当然,托管内存的设置要科学,并添加一些有利于RocksDB状态吞吐量的参数,如:

state.backend.rocksdb.predefined-options  SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.memory.partitioned-index-filters  true

基于堆的状态呢?

与RocksDB相反,基于堆的JobManager和FileSystem状态后端无需序列化和反序列化,当然状态的大小就要受制于TaskManager内存。不过,如果我们采用这两种状态后端,ValueState<Map>MapState也就没有明显的性能差别了,因为HeapValueStateHeapMapState的底层都是相同的,即CopyOnWriteStateTable,本质上是内存中的状态映射表。读者有兴趣可以自行参考对应的Flink源码,这里不再啰嗦了。

ValueState、ListState、MapState三者在RocksDB状态后端和基于堆的状态后端中的异同点可以概括成下表。

The End

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容