MQ系列:
kafka系列一: kafka简介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:动态添加监听器
kafka系列五:失败后重试机制
前言
上次介绍过,在springboot中,使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态添加监听topic的方式。
明显,我们的目标在于如何通过代码实现和@KafkaListener同样的效果。要做到这样,那就必须要了解@KafkaListener的原理。
@KafkaListener的原理
通过阅读Spring-kafka代码可以了解到@KafkaListener的工作原理如下图所示
从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。
动态方案
从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties) {
this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
}
因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Deserializer<K> keyDeserializer,
@Nullable Deserializer<V> valueDeserializer) {
this.configs = new HashMap<>(configs);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
定义consumerFactory
通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
因此,定义以下消费者工厂方法
/***
* 消费者工厂
* @param groupId
* @return
*/
private DefaultKafkaConsumerFactory<String, String> consumerFactory(String groupId) {
// consumer配置
Map<String, Object> configMap = new HashMap();
// 采用手动提交的方式
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
// 序列化
Deserializer<String> stringDeserializer = new StringDeserializer();
return new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
}
定义addKafkaListener
ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。
public void addKafkaListener(String topic, String groupId) {
// kafka 消费者
DefaultKafkaConsumerFactory<String, String> factory = consumerFactory(groupId);
// 相关属性
ContainerProperties props = new ContainerProperties(topic);
// 设置监听器
props.setMessageListener(new CustomerMsgHandler());
props.setGroupId(groupId);
props.setAckMode(ContainerProperties.AckMode.MANUAL);
// 构造 ListenerContainer
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, props);
// 启动
container.start();
}
定义CustomerMsgHandler
另外,还需要实现AcknowledgingMessageListener接口(onMessage方法),定义自己处理消息的类:
@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
// TODO doSomething
// 前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
acknowledgment.acknowledge();
}
}
以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。
总结
要实现动态添加新Topic的监听器,实例化KafkaMessageListenerContainer并调用其start()方法,即可实现动态kafka topic的监听。
另外还需要在配置文件中配置kafka的相关属性。