问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。
方案一:
如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。
@Configuration
@EnableKafka
public class KafkaConfig {
private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
private static final String LOCAL_GROUP_ID = "cctest";
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
// kv都用string来序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@KafkaListener(topicPattern = "${topicPattern}")
public void listen(String data) {
System.out.println("message have been consumed:"+data);
}
}
方案弊端:
- 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
- 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。
方案二:
用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。
管理消费者bean
@Component
@Data
public class ResourceNotifyConsumer {
private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);
@Autowired
// 消费数据的service,异步处理
private MonitorPoolService monitorPoolService;
private KafkaConsumer<String, String> consumer = null;
public void closeConsumer() {
// 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
// 同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
consumer = null;
}
public void onMessage() {
while (consumer != null) {
// 从kafka中取出100毫秒的数据
List<Map<String, Object>> datas = new ArrayList<>();
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> recordData = new HashMap<>(8);
recordData.put(record.key(), record.value());
datas.add(recordData);
}
// 处理消息
if (CollectionUtils.isNotEmpty(datas)) {
monitorPoolService.dealResource(datas);
}
}
}
}
定时调用接口
@GetMapping("/check")
public ResponseMessage checkConfig() {
// 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
resourceNotifyConsumer.closeConsumer();
// 新的监听topic集合
List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
// 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
resourceService.buildConsumer(topics);
// 更新配置的状态
topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
topicConfigDao.saveAll(topicWithoutListening);
}
return ResponseMessage.success();
方案弊端:
- 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
- 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。
方案三:
- 独立kafka消费模块为一个单独的jar文件
- 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程
方案弊端:
- 能在模块内解决的问题,尽量不变复杂。。
总结:
最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。