SpringBoot配置kafka多数据源消费

Kafka配置文件

@Configuration
@EnableKafka
public class KafkaConfig {
    private String brokerAddress = "BBBB";

    private String groupId = "BBBB";

    private String brokerAddressA = "AAAA";

    private String groupIdA = "AAAA";

    Bean("AAAA")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryCapability() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new @ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs(this.groupIdA, this.brokerAddressA));
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    @Bean("BBBB")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs(this.groupId, this.brokerAddress));
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private Map<String, Object> consumerConfigs(String groupId, String brokerAddress) {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "GSSAPI");
        return properties;
    }
}

Kafka消费

@Component
@Slf4j
public class KafkaConsumer {
    @Autowired
    private KafkaMessageHandler<String> messageHandler;

    @KafkaListener(topics = "topic_BBBB", containerFactory = "BBBB")
    public void processMessage(String content) {
        log.debug("recieved topic_BBBB = " + content);
    }

    @KafkaListener(topics = "topic_AAAA", containerFactory = "AAAA")
    public void processCapabilityChangeMessage(String content) {
        log.debug("recieved topic_AAAA = " + content);
    }
}

=====================上面在Kafka 1.0.6.RELEASE亲测过===========

@KafkaListener(topics = "xxxxx", groupId = "xxxx")

直到后来发布了spring-kafka 1.3.x的版本后,增加了groupId的属性,非常方便的帮助我们解决了实现每个topic自定义一个消费组的问题,我们再也不用共用一个消费组了。

详细配置见:springboot kafka group.id多消费组配置

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容