Kafka配置文件
@Configuration
@EnableKafka
public class KafkaConfig {
private String brokerAddress = "BBBB";
private String groupId = "BBBB";
private String brokerAddressA = "AAAA";
private String groupIdA = "AAAA";
Bean("AAAA")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryCapability() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new @ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs(this.groupIdA, this.brokerAddressA));
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean("BBBB")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs(this.groupId, this.brokerAddress));
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private Map<String, Object> consumerConfigs(String groupId, String brokerAddress) {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
return properties;
}
}
Kafka消费
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private KafkaMessageHandler<String> messageHandler;
@KafkaListener(topics = "topic_BBBB", containerFactory = "BBBB")
public void processMessage(String content) {
log.debug("recieved topic_BBBB = " + content);
}
@KafkaListener(topics = "topic_AAAA", containerFactory = "AAAA")
public void processCapabilityChangeMessage(String content) {
log.debug("recieved topic_AAAA = " + content);
}
}
=====================上面在Kafka 1.0.6.RELEASE亲测过===========