1. 引入pom
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
2. kafka消费者工厂配置
提示:下面属于基础配置,其他配置可以根据业务需求添加
package com.xxx.xxx.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* @description: kafka消费者工厂配置
* @author : xxx
* @date: 2020/5/25 09:00
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap.servers}")
private Stringservers;
@Value("${kafka.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.session.timeout.ms}")
private StringsessionTimeout;
@Value("${kafka.group.id}")
private StringgroupId;
@Value("${kafka.max.poll.records}")
private int maxPollRecords;
@Bean
public KafkaListenerContainerFactory>kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 并发创建的消费者数量
factory.setConcurrency(1);
// 开启批处理
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(2000);
return factory;
}
@Bean
public ConsumerFactoryconsumerFactory() {
return new DefaultKafkaConsumerFactory<>(getCommonPropertis(groupId));
}
private MapgetCommonPropertis(String groupId) {
Map props =new HashMap<>(11);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
return props;
}
}
3.注解消费数据
package com.kafka;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* @description: kafka消费
* @author : xxx
* @date: 2020/5/25 09:00
*/
@Slf4j
@Component
public class KafkaConsumer {
@Resource
private RedisUtilredisUtil;
@KafkaListener(id ="routePush", topics ="#{'${kafka.route.push.topics}'.split(',')}")
public void listenPartition0(List> records) {
try {
log.info("routePush Received size: " + records.size());
for (ConsumerRecord record : records) {
log.info("routePush Received: " + record);
String value = record.value();
JSONObject jsonObject = (JSONObject) JSONObject.parse(value);
// 业务操作TODO
}
}catch(Exception e){
e.printStackTrace();
}
}
技术有限,欢迎大家更正问题,希望一起提升水准。谢谢O(∩_∩)O!