1.导包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.1</version>
</dependency>
2.Kafka配置类
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lvye.java.datacenter.mq.KafkaConsumer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import java.io.IOException;
import java.util.*;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.session.timeout.ms}")
private Integer sessionTimeoutMs;
@Value("${kafka.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.auto.commit.interval.ms}")
private Integer autoCommitIntervalMs;
@Value("${kafka.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.group.id}")
private String groupId;
@Value("${kafka.avro.schema.registry.url}")
private String schemaRegistryUrl;
@Value("${kafka.max_poll_record}")
private String maxPollRecord;
public static Map<String, Class<?>> config ;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(null);
factory.setBatchListener(true);
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(30000);
factory.setBatchErrorHandler(batchErrorHandler);
return factory;
}
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean(name="topicConfigs")
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 6);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("schema.registry.url", schemaRegistryUrl);
return props;
}
@Bean
public static HashMap<String,Class<?>> loadTopic () {
Properties propertie = null;
HashMap<String, Class<?>> topics = new HashMap<>();
try {
propertie = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
} catch (IOException e) {
throw new RuntimeException("load kafka-topic-mapping file error");
}
Set< Map.Entry<Object,Object>> kvs = propertie.entrySet();
try {
for(Map.Entry<Object,Object> kv:kvs){
topics.put((String)kv.getKey(), Class.forName((String)kv.getValue()));
}
}catch (ClassNotFoundException e) {
e.printStackTrace();
}
config = topics;
return topics;
};
@Bean
public KafkaConsumer consumer() {
return new KafkaConsumer();
}
}
3.配置文件(application.properties)
kafka.bootstrap.servers=121.102.172.42:9092,121.102.172.42:9092,121.102.172.43:9092
kafka.avro.schema.registry.url=http://121.102.218.111:8081
kafka.session.timeout.ms=300000
kafka.enable.auto.commit=false
kafka.auto.commit.interval.ms=60000
kafka.auto.offset.reset=earliest
kafka.max_poll_record=1000
kafka.group.id=DmSync
3.定义Consumer类
@Component("DcKafkaConsumer")
public class KafkaConsumer {
public String[] getTopics() {
Properties properties = null;
try {
properties = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
} catch (IOException e) {
throw new RuntimeException("load kafka-topic-mapping file error");
}
return properties.keySet().toArray(new String[properties.keySet().size()]);
}
@KafkaListener(topics = "#{DcKafkaConsumer.getTopics()}")
public void listen(List<ConsumerRecord<String, GenericRecord>> msgs, Acknowledgment ack) {
msgs.forEach(p -> dealMsg(p, session));
ack.acknowledge();
}
private void dealMsg(ConsumerRecord<String, GenericRecord> msg) {
Object o;
String opt = msg.value().get(operate).toString();
String snapshot = "r";
if (insert.equals(opt) || update.equals(opt)) {
o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
//todo
} else if (delete.equals(opt)) {
o = JSON.parseObject(msg.value().get("before").toString(), KafkaConfig.config.get(msg.topic()));
//todo
} else if (snapshot.equals(opt)) {
o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
//todo
}
}
}
4.定义kafka-topic与实体类映射文件kafka-topic-mapping.properties
内容
xxx-topic1=类1全路径
xxx-topic2=类2全路径