kafka系列四:动态添加监听器


MQ系列:
kafka系列一: kafka简介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:动态添加监听器
kafka系列五:失败后重试机制


前言

上次介绍过,在springboot中,使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态添加监听topic的方式。

明显,我们的目标在于如何通过代码实现和@KafkaListener同样的效果。要做到这样,那就必须要了解@KafkaListener的原理。

@KafkaListener的原理

通过阅读Spring-kafka代码可以了解到@KafkaListener的工作原理如下图所示

@KafkaListener的工作原理

从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。

动态方案

从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数

public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
            ContainerProperties containerProperties) {

        this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:

public DefaultKafkaConsumerFactory(Map<String, Object> configs,
            @Nullable Deserializer<K> keyDeserializer,
            @Nullable Deserializer<V> valueDeserializer) {
        this.configs = new HashMap<>(configs);
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
    }

定义consumerFactory

通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
因此,定义以下消费者工厂方法

    /***
     * 消费者工厂
     * @param groupId
     * @return
     */
    private DefaultKafkaConsumerFactory<String, String> consumerFactory(String groupId) {
        // consumer配置
        Map<String, Object> configMap = new HashMap();
        // 采用手动提交的方式
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        // 序列化
        Deserializer<String> stringDeserializer = new StringDeserializer();

        return new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
    }

定义addKafkaListener

ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。

public void addKafkaListener(String topic, String groupId) {
        // kafka 消费者
        DefaultKafkaConsumerFactory<String, String> factory = consumerFactory(groupId);
        
        // 相关属性
        ContainerProperties props = new ContainerProperties(topic);

        // 设置监听器
        props.setMessageListener(new CustomerMsgHandler());
        props.setGroupId(groupId);
        props.setAckMode(ContainerProperties.AckMode.MANUAL);

        // 构造 ListenerContainer
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, props);
        // 启动
        container.start();
    }

定义CustomerMsgHandler

另外,还需要实现AcknowledgingMessageListener接口(onMessage方法),定义自己处理消息的类:

@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // TODO doSomething

        // 前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
        acknowledgment.acknowledge();
    }
}

以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。

总结

要实现动态添加新Topic的监听器,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。

另外还需要在配置文件中配置kafka的相关属性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容