【方案白嫖】Kafka如何监听动态改变的topic

问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。

方案一:

如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。

@Configuration
@EnableKafka
public class KafkaConfig {

    private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
    private static final String LOCAL_GROUP_ID = "cctest";

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
        // kv都用string来序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @KafkaListener(topicPattern = "${topicPattern}")
    public void listen(String data) {
        System.out.println("message have been consumed:"+data);
    }
}

方案弊端:

  1. 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
  2. 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。

方案二:

用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。

管理消费者bean
@Component
@Data
public class ResourceNotifyConsumer {

    private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);

    @Autowired
    // 消费数据的service,异步处理
    private MonitorPoolService monitorPoolService;

    private KafkaConsumer<String, String> consumer = null;

    public void closeConsumer() {
        // 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
        //  同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
        consumer = null;
    }

    public void onMessage() {
        while (consumer != null) {
            // 从kafka中取出100毫秒的数据
            List<Map<String, Object>> datas = new ArrayList<>();
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                Map<String, Object> recordData = new HashMap<>(8);
                recordData.put(record.key(), record.value());
                datas.add(recordData);
            }
            // 处理消息
            if (CollectionUtils.isNotEmpty(datas)) {
                monitorPoolService.dealResource(datas);
            }
        }
    }
}

定时调用接口
@GetMapping("/check")
public ResponseMessage checkConfig() {
    // 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
    List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
    List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
    if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
        resourceNotifyConsumer.closeConsumer();
        // 新的监听topic集合
        List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
        // 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
        resourceService.buildConsumer(topics);
        // 更新配置的状态
        topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
        topicConfigDao.saveAll(topicWithoutListening);
    }
    return ResponseMessage.success();

方案弊端:

  1. 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
  2. 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。

方案三:

  1. 独立kafka消费模块为一个单独的jar文件
  2. 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程

方案弊端:

  1. 能在模块内解决的问题,尽量不变复杂。。

总结:

最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 先讲一下如何做,以及设计的原因,给出一部分示例demo作为参考 方案 Kafka作为消息中间件,Redis的作用就...
    slyxk阅读 2,557评论 0 0
  • 什么是Kafka Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。 最早设计的目的是作为Li...
    WEIJAVA阅读 8,555评论 4 76
  • Kafka Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(...
    redleaf阅读 349评论 0 2
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,552评论 16 22
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,606评论 0 11