kafka系列四:动态添加监听器


MQ系列:
kafka系列一: kafka简介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:动态添加监听器
kafka系列五:失败后重试机制


前言

上次介绍过,在springboot中,使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态添加监听topic的方式。

明显,我们的目标在于如何通过代码实现和@KafkaListener同样的效果。要做到这样,那就必须要了解@KafkaListener的原理。

@KafkaListener的原理

通过阅读Spring-kafka代码可以了解到@KafkaListener的工作原理如下图所示

@KafkaListener的工作原理

从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。

动态方案

从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数

public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
            ContainerProperties containerProperties) {

        this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:

public DefaultKafkaConsumerFactory(Map<String, Object> configs,
            @Nullable Deserializer<K> keyDeserializer,
            @Nullable Deserializer<V> valueDeserializer) {
        this.configs = new HashMap<>(configs);
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
    }

定义consumerFactory

通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
因此,定义以下消费者工厂方法

    /***
     * 消费者工厂
     * @param groupId
     * @return
     */
    private DefaultKafkaConsumerFactory<String, String> consumerFactory(String groupId) {
        // consumer配置
        Map<String, Object> configMap = new HashMap();
        // 采用手动提交的方式
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        // 序列化
        Deserializer<String> stringDeserializer = new StringDeserializer();

        return new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
    }

定义addKafkaListener

ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。

public void addKafkaListener(String topic, String groupId) {
        // kafka 消费者
        DefaultKafkaConsumerFactory<String, String> factory = consumerFactory(groupId);
        
        // 相关属性
        ContainerProperties props = new ContainerProperties(topic);

        // 设置监听器
        props.setMessageListener(new CustomerMsgHandler());
        props.setGroupId(groupId);
        props.setAckMode(ContainerProperties.AckMode.MANUAL);

        // 构造 ListenerContainer
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, props);
        // 启动
        container.start();
    }

定义CustomerMsgHandler

另外,还需要实现AcknowledgingMessageListener接口(onMessage方法),定义自己处理消息的类:

@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // TODO doSomething

        // 前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
        acknowledgment.acknowledge();
    }
}

以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。

总结

要实现动态添加新Topic的监听器,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。

另外还需要在配置文件中配置kafka的相关属性。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容