-
注册 KafkaListenerContainerFactory
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String servers; @Value("${kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session-timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto-commit-interval}") private String autoCommitInterval; @Value("${kafka.consumer.group-id}") private String groupId; @Value("${kafka.consumer.auto-offset-reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.consumer.max-poll-records}") private int maxPollRecords; @Value("${kafka.sasl-mechanism}") private String saslMechanism; @Value("${kafka.ssl.truststore.location}") private String truststoreLocation; @Value("${kafka.ssl.truststore.password}") private String truststorePassword; @Value("${kafka.ssl.security-protocol}") private String securityProtocol; @Value("${java.security.auth.login.config}") private String authLoginConfig; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 并发创建的消费者数量 factory.setConcurrency(concurrency); // 开启批处理 factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); if (!enableAutoCommit) { factory.getContainerProperties().setAckMode((AbstractMessageListenerContainer.AckMode.MANUAL)); } return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(getCommonPropertis(groupId)); } private Map<String, Object> getCommonPropertis(String groupId) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10485760); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 10485760); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); return props; } }
-
消费 Kafka
@KafkaListener(id = "test", topics = "#{'${kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory") public void consumeList(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { logger.info("=============== Total " + records.size() + " events in this batch .."); try { List<String> list = new ArrayList<String>(); for (ConsumerRecord<String, String> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { String message = record.value(); list.add(message); String topic = record.topic(); logger.info("message = {}", message); } } boolean handle = this.batchHandle(list); if (handle) { // 直接提交 offset ack.acknowledge(); } } catch (Exception e) { logger.error(e.getMessage(), e); } }
Spring Boot 整合 Kafka 并使用 @KafkaListener 并发批量接收消息
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 本文代码格式不好调整,可以参考本人在其他地方的同篇博文 https://blog.csdn.net/russle/...
- kafka消费模式 基于partition 指定offset 基于group auto.offset.reset ...