【方案白嫖】Kafka如何监听动态改变的topic

问题简述: 服务运行过程中,需要根据实际情况(配置)动态改变监听的topic。

方案一:

如果想改变的topic可以符合一定的规则,能做到正则限定范围,在限定的范围内变动,可以直接配置KafkaListener监听正则规则。

@Configuration
@EnableKafka
public class KafkaConfig {

    private static final String KAFKA_SERVERS_CONFIG = "192.168.77.202:9092";
    private static final String LOCAL_GROUP_ID = "cctest";

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, LOCAL_GROUP_ID);
        // kv都用string来序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS_CONFIG);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @KafkaListener(topicPattern = "${topicPattern}")
    public void listen(String data) {
        System.out.println("message have been consumed:"+data);
    }
}

方案弊端:

  1. 变动的topic只能在限定范围内,如果新增topic不符合规则,无法消费。
  2. 因为每个topic只有一个partition,单线程消费性能低下,如果线上的数据量太大,消费一个大的topic时其他topic无法消费。

方案二:

用Spring管理的SingletonBean缓存kafkaconsumer配置,并设置定时任务,每三分钟check一次,从数据库读取相应配置,将已有配置写入缓存,当读取的配置和缓存不一致时,销毁bean里已有消费者,创建新的消费者。

管理消费者bean
@Component
@Data
public class ResourceNotifyConsumer {

    private Logger logger = LoggerFactory.getLogger(ResourceNotifyConsumer.class);

    @Autowired
    // 消费数据的service,异步处理
    private MonitorPoolService monitorPoolService;

    private KafkaConsumer<String, String> consumer = null;

    public void closeConsumer() {
        // 如果consumer.wakeup()停掉当前poll并抛出异常,在没阻塞的时候,会在下一次poll抛出异常,但下一次poll已经是新的consumer对象。
        //  同时,consumer不支持多线程同时操作,所以这里把引用去掉,靠gc回收旧consumer。
        consumer = null;
    }

    public void onMessage() {
        while (consumer != null) {
            // 从kafka中取出100毫秒的数据
            List<Map<String, Object>> datas = new ArrayList<>();
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                Map<String, Object> recordData = new HashMap<>(8);
                recordData.put(record.key(), record.value());
                datas.add(recordData);
            }
            // 处理消息
            if (CollectionUtils.isNotEmpty(datas)) {
                monitorPoolService.dealResource(datas);
            }
        }
    }
}

定时调用接口
@GetMapping("/check")
public ResponseMessage checkConfig() {
    // 定时查询配置是否有变动,因场景这里只做增加检查,如果场景有修改,可以缓存旧数据与配置做对比。
    List<DeviceTopicConfig> deviceTopicConfigs = topicConfigDao.findAll();
    List<DeviceTopicConfig> topicWithoutListening = deviceTopicConfigs.stream().filter(t -> !t.getListening()).collect(Collectors.toList());
    if (CollectionUtils.isNotEmpty(topicWithoutListening)) {
        resourceNotifyConsumer.closeConsumer();
        // 新的监听topic集合
        List<String> topics = deviceTopicConfigs.stream().map(DeviceTopicConfig::getTopic).collect(Collectors.toList());
        // 此方法类似上面配置新建监听新topic集合的consumer,并调用bean的onMessage方法
        resourceService.buildConsumer(topics);
        // 更新配置的状态
        topicWithoutListening.forEach(deviceTopicConfig -> deviceTopicConfig.setListening(true));
        topicConfigDao.saveAll(topicWithoutListening);
    }
    return ResponseMessage.success();

方案弊端:

  1. 如果提交offset不合理,很可能因consumer配置的不同出现重复消费或者未消费情况。
  2. 消费数据采用线程池,如果监听的topic接收消息过多可能触发RejectedExecutionHandler。

方案三:

  1. 独立kafka消费模块为一个单独的jar文件
  2. 另起一个系统,定时查询数据库,发现topic改变后就java调用linux命令杀掉kafka的jar进程

方案弊端:

  1. 能在模块内解决的问题,尽量不变复杂。。

总结:

最后选择了方案二,因为尽管有topic的增加,但实际场景只可能增加一两个topic,两个问题的权衡:1.在增加的时候消息丢失或重复消费都可以接受,并且这个问题只需要多手动提交(比如在接口调用时和旧消费者置空之前)可以避免。2.评估后消息不会太多。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容

  • 先讲一下如何做,以及设计的原因,给出一部分示例demo作为参考 方案 Kafka作为消息中间件,Redis的作用就...
    slyxk阅读 2,321评论 0 0
  • 什么是Kafka Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。 最早设计的目的是作为Li...
    WEIJAVA阅读 8,477评论 4 76
  • Kafka Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(...
    redleaf阅读 339评论 0 2
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,517评论 16 22
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,561评论 0 11