KafkaStream Local Store和Global Store区别和用法

前言

使用kafkaStream进行流式计算时,如果需要对数据进行状态处理,那么常用的会遇到kafkaStream的store,而store也有Local Store以及Global Store,当然也可以使用其他方案的来进行状态保存,文本主要理清楚kafkaStream中的Local Store以及Global Store之间的区别和用法,以及什么时候选择何种store和当store无法满足我们需求时,应该如何使用其他方案来进行数据的状态保存

本文所有方法和代码皆只针对kafka-streams的3.7.0版本,pom如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.7.0</version>
</dependency>

由于不同版本的KafkaStream在使用上有较大区别,也因为KafkaStream不同版本API改动较大,所以如果版本不一致,使用方法甚至是一些核心概念都会跟本文讲述有所出入,并且KafkaStream由于相对小众,文档也很少,官网的文档也只是一些简单介绍,所以需要注意避坑

Local Store和Global Store的共同点和区别点

共同点:

1、都是用于流式计算中进行状态存储的

2、具体结构类似,使用的都是如:KeyValueStore,SessionStore等类

3、实际机制类似,会通过内存、本地目录和kafka Topic的变更记录等方式来进行缓存数据更新和恢复

不同点

1、适用场景不同

Local Store 适合用于单个实例的状态管理,适合处理单个分区的数据,并且缓存数据不会多个实例共享

Global Store 适用于跨实例共享数据状态,多个实例通过Topic中的更新记录来跟新进程中的数据

2、使用方法不同

Local Store 可以直接在代码中调用对应类型存储(如:KeyValueStore)的put方法进行更新数据,不需要考虑数据一致性(因为可见性只有单个实例)

Global Store 不能直接调用对应的put和delete方法,所有更新和删除缓存都需要通过发送数据到Global 配置的topic中,然后自行实现Topic数据消费者(实现:org.apache.kafka.streams.processor.api.Processor类),在消费者类中进行数据更新等操作,同时因为需要自己实现更新实例中的数据逻辑,数据一致性也需要开发者自行处理,虽然正常来说利用Kafka本身的特性很少出现数据一致性问题,但是如果多实例之间性能差异和网络环境等差异,容易将数据不一致的时长延长,如果要求Store一致性强且容忍数据不一致时限短,则需要注意考虑Store更新数据消费者的处理能力

3、扩展性

Local Store:可以通过增加输入主题的分区数来扩展处理能力,但每个实例仍然独立运行。

Global Store:需要在多个实例之间共享状态,因此在设计时需要考虑如何高效地管理和同步状态。

常见的Store 类型

org.apache.kafka.streams.state.KeyValueStore
org.apache.kafka.streams.state.SessionStore
org.apache.kafka.streams.state.TimestampedKeyValueStore
org.apache.kafka.streams.state.VersionedKeyValueStore
org.apache.kafka.streams.state.WindowStore

需要根据实际使用场景选择合适的状态存储类

用法

Local Store

第一步,先生成对应类型的StoreBuilder对象,如我需要用KeyValueStore,然后状态存储的名字是:testLocalStore(这个名字不能重复,因为会根据消费者id加储存名称创建对应的Topic,当然如果是不同的KafkaStream程序,消费者id不一致,那么重复就没有关系了),因为是KeyValue类型的储存,所以需要设定对应的Key和Value数据的序列化对象,具体代码如下:

StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());

其中Stores.persistentKeyValueStore代表的我得存储是持久化的,正常都是会用持久化,当然也有存储一些不重要或者程序重启丢失也无所谓的状态数据,可以使用Stores.inMemoryKeyValueStore以及基于LRU淘汰机制的储存Stores.lruMap,第二个参数Serdes.String()代表存储数据的key是字符串,第三个参数同理,如果是要存储一些对象,也可以使用自定义的序列化类,实现

org.apache.kafka.common.serialization.Serializer

序列化类,以及反序列化类

org.apache.kafka.common.serialization.Deserializer

然后定义好即可,如:

new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
                new KryoDeserializer<>(TestStoreBean.class)

其中KryoSerializer和KryoDeserializer是我自定义的使用Kryo序列化Java对象的类,TestStoreBean是我保存的状态的数据封装bean

KryoSerializer代码如下:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;

/**
 * kryo序列化类
 * @author Raye
 * @since 2024-6-4
 */
public class KryoSerializer<T> implements Serializer<T> {
    private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();

            /**
             * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,
             * 上线的同时就必须清除 Redis 里的所有缓存,
             * 否则那些缓存再回来反序列化的时候,就会报错
             */
            //支持对象循环引用(否则会栈溢出)
            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置

            //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置


            return kryo;
        }
    };
    /**
     * 获得当前线程的 Kryo 实例
     *
     * @return 当前线程的 Kryo 实例
     */
    public static Kryo getInstance() {
        return KRYO_LOCAL.get();
    }
    private Class<T> clz;
    public KryoSerializer(Class<T> clz) {
        this.clz = clz;
    }
    @Override
    public byte[] serialize(String s, T t) {
        if(t == null){
            return null;
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);

        Kryo kryo = getInstance();
        kryo.writeObjectOrNull(output, t,clz);
        output.flush();

        return byteArrayOutputStream.toByteArray();
    }
}

KryoDeserializer代码如下:

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;

/**
 * kryo反序列化类
 * @author Raye
 * @since 2024-6-4
 */
public class KryoDeserializer<T> implements Deserializer<T> {
    private static final ThreadLocal<Kryo> KRYO_LOCAL = new ThreadLocal<Kryo>() {
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();

            /**
             * 不要轻易改变这里的配置!更改之后,序列化的格式就会发生变化,
             * 上线的同时就必须清除 Redis 里的所有缓存,
             * 否则那些缓存再回来反序列化的时候,就会报错
             */
            //支持对象循环引用(否则会栈溢出)
            kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置

            //不强制要求注册类(注册行为无法保证多个 JVM 内同一个类的注册编号相同;而且业务系统中大量的 Class 也难以一一注册)
            kryo.setRegistrationRequired(false); //默认值就是 false,添加此行的目的是为了提醒维护者,不要改变这个配置


            return kryo;
        }
    };
    /**
     * 获得当前线程的 Kryo 实例
     *
     * @return 当前线程的 Kryo 实例
     */
    public static Kryo getInstance() {
        return KRYO_LOCAL.get();
    }

    private Class<T> clz;
    public KryoDeserializer(Class<T> clz) {
        this.clz = clz;
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if(bytes == null){
            return null;
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);

        Kryo kryo = getInstance();
        try {
            return kryo.readObjectOrNull(input, clz);
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;

    }
}

同理,使用LocalStore时,可以将代码替换成以下内容:

StoreBuilder<KeyValueStore<String, TestStoreBean>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), new Serdes.WrapperSerde<>(new KryoSerializer<>(TestStoreBean.class),
                new KryoDeserializer<>(TestStoreBean.class));

有了StoreBuilder对象之后,直接在StreamsBuilder对象中添加即可

streamsBuilder.addStateStore(kvBuilder);

需要使用时,先在处理数据的Processor类中的init方法获取对应的状态存储对象

this.testLocalStore = context.getStateStore("testLocalStore");

然后就可以在process方法中调用testLocalStore的get、put、delete等方法操作状态存储数据了,具体代码如下

    @Slf4j
    public static class StreamProcessor implements Processor<String,String,String,String> {

        private KeyValueStore<String,String> testLocalStore;
        private ProcessorContext context;
        
        private String toTopic;
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.testLocalStore = context.getStateStore("testLocalStore");
        }

        public StreamProcessor(String toTopic) {
            this.toTopic = toTopic;
        }

        @Override
        public void process(Record<String, String> record) {
            testLocalStore.put("key1","testValue1");
            log.info("testLocalStore key1 : {}",testLocalStore.get("key1"));
            testLocalStore.delete("key1");
            context.forward(record,toTopic);
        }
    }

其中实现的Processor类全称是:org.apache.kafka.streams.processor.api.Processor,上面代码只是在数据处理流程中简单保存了数据,然后获取出来以及删除,没有对流数据做任何处理,就直接发送到输出的topic了

完整代码如下:

    @Bean
    public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){
        log.info("init kStreamTestStore");
        StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testLocalStore"), Serdes.String(), Serdes.String());
        streamsBuilder.addStateStore(kvBuilder);
        KStream<String, String> stream = streamsBuilder.stream(fromTopic);
        stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic),"testLocalStore");
        streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);
        return stream;
    }

注意:由于使用Store需要通过ProcessorContext对象来获取Store对象,所以在KafkaStream常用的一些map,mapValue,flatMapValues这些流式计算方法中是没办法使用的,只能在一些更底层的Api中去使用,如process

Global Store

同Local Store一样,需要先生成对应类型的StoreBuilder对象,代码跟Local Store一样

StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());

然后定义处理状态更新日志的Processor类,在这个类中,可以对缓存数据进行更新和删除操作(其他地方都是不能直接修改Global Store的)

public class GlobalStoreHandleProcessor<K, V> implements Processor<K,V,Void,Void> {
    private KeyValueStore<K, V> store;
    private String storeName;

    public GlobalStoreHandleProcessor(String storeName) {
        this.storeName = storeName;
    }
    
    @Override
    public void process(Record<K,V> record) {
        if(record == null || record.value() == null) {
            return;
        }
        store.put(record.key(), record.value());
    }

    @Override
    public void init(ProcessorContext context) {
        this.store = context.getStateStore(storeName);
    }

}

跟KafkaStream的process是一样的,只需要在process方法中对缓存进行更新或者删除操作即可,我这里只是简单put操作,具体逻辑可以根据自己情况进行处理

在StreamsBuilder对象中添加StoreBuilder对象

streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
                ()->new GlobalStoreHandleProcessor<>("testGlobalStore"));

其中第二个参数testGlobalStore是Global Store绑定的数据变更记录的Topic,如果要更新,则需要通过向这个topic发送数据来进行更新Global Store中的数据

处理数据的Processor类实例代码

public static class StreamProcessor implements Processor<String,String,String,String> {

        private KeyValueStore<String,String> testGlobalStore;
        private ProcessorContext context;

        private String toTopic;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.testGlobalStore = context.getStateStore("testGlobalStore");
        }

        public StreamProcessor(String toTopic) {
            this.toTopic = toTopic;
        }

        @Override
        public void process(Record<String, String> record) {
           testLocalStore.put(jsonObject.getString("key"),jsonObject.getString("value"));
            log.info("testLocalStore key1 : {}",testGlobalStore.get("key1"));
            //发送更新Global Store的数据
            context.forward(new Record("testGlobalKey","global value",record.timestamp()),"testGlobalStore");
            context.forward(record,toTopic);
        }
    }

与Local Store不同的是,不能在处理数据流的时候,对缓存进行put操作,只能通过将数据发送到Global Store关联的topic中,在GlobalStoreHandleProcessor中去做更新

完整代码如下:

    @Bean
    public KStream<String,String> kStreamTestStore(StreamsBuilder streamsBuilder){
        log.info("init kStreamTestStore");
        StoreBuilder<KeyValueStore<String, String>> kvBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("testGlobalStore"), Serdes.String(), Serdes.String());
        streamsBuilder.addGlobalStore(kvBuilder,"testGlobalStore", Consumed.with(Serdes.String(),Serdes.String()),
                ()->new GlobalStoreHandleProcessor<>("testGlobalStore"));
        KStream<String, String> stream = streamsBuilder.stream(fromTopic);
        stream.process(()->new StreamProcessor(toTopic), Named.as(fromTopic));
        streamsBuilder.build().addSink(toTopic,toTopic,fromTopic);
        streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);
        return stream;
    }

与Local Store不同点在于,不需要在process方法中添加store的名字,但是因为要从process方法中直接将更新Store的数据发送到topic,所以需要添加一个Global Store绑定的Topic的输出扩展,也就是下面这行代码

streamsBuilder.build().addSink("testGlobalStore","testGlobalStore",fromTopic);

不适合的场景

由于KafkaStream Store 没有自动过期数据和过期数据自动删除的概率(可能是有,但是我没有找到对应文档),所以如果我们存储的key集合特别大,并且需要自动过期和自动删除,那么就不适合使用Store来处理了,因为需要我们自行处理删除逻辑,尤其是有些场景中,并不会对过期的key进行访问,所以采用惰性删除基本上不现实,但是定时删除,因为Store会存储到磁盘,如果存储的key很多,删除对应数据的时候耗时很长,尤其是单次删除大量key的时候,可能会直接超时,并且还必须要自己处理定时删除的逻辑,想要更好的去删除,就需要大量时间去开发和优化。

虽然使用内存的Store能稍微好点,但是毕竟单个进程内存有限,并且正常流处理中,如果需要保存状态,那么肯定是希望进程重启之后,能恢复数据,避免计算出错的,所以如果是有大量不重复key,并且数据需要到期自动删除的话,可以直接使用Redis做状态存储,并且进过我得实际测试,使用Redis并不比Store慢,并且在key量越来越大的情况下,Redis的性能是完全优于Store的(只针对持久化的Store),当然使用Redis,还是会更使用Global Store一样,需要考虑数据一致性的问题,不过这个问题可以通过将相同key的数据从Kafka Topic就分配到同一个Topic分区中来避免

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容