Spring-KafkaListener使用

1.导包

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.1</version>
        </dependency>

2.Kafka配置类

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lvye.java.datacenter.mq.KafkaConsumer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;

import java.io.IOException;
import java.util.*;


@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;
    @Value("${kafka.session.timeout.ms}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.auto.commit.interval.ms}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.group.id}")
    private String groupId;
    @Value("${kafka.avro.schema.registry.url}")
    private String schemaRegistryUrl;
    @Value("${kafka.max_poll_record}")
    private String  maxPollRecord;

    public static Map<String, Class<?>> config ;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(null);
        factory.setBatchListener(true);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(30000);
        factory.setBatchErrorHandler(batchErrorHandler);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean(name="topicConfigs")
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 6);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("schema.registry.url", schemaRegistryUrl);
        return props;
    }


    @Bean
    public static HashMap<String,Class<?>> loadTopic () {
        Properties propertie = null;
        HashMap<String, Class<?>> topics = new HashMap<>();
        try {
            propertie = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        Set< Map.Entry<Object,Object>> kvs = propertie.entrySet();
        try {
            for(Map.Entry<Object,Object> kv:kvs){
                topics.put((String)kv.getKey(), Class.forName((String)kv.getValue()));
            }
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        config = topics;
        return topics;
    };

    @Bean
    public KafkaConsumer consumer() {
        return new KafkaConsumer();
    }


}

3.配置文件(application.properties)

kafka.bootstrap.servers=121.102.172.42:9092,121.102.172.42:9092,121.102.172.43:9092
kafka.avro.schema.registry.url=http://121.102.218.111:8081
kafka.session.timeout.ms=300000
kafka.enable.auto.commit=false
kafka.auto.commit.interval.ms=60000
kafka.auto.offset.reset=earliest
kafka.max_poll_record=1000
kafka.group.id=DmSync

3.定义Consumer类

@Component("DcKafkaConsumer")
public class KafkaConsumer {
 
   public String[] getTopics() {
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        return properties.keySet().toArray(new String[properties.keySet().size()]);
    }

   @KafkaListener(topics = "#{DcKafkaConsumer.getTopics()}")
    public void listen(List<ConsumerRecord<String, GenericRecord>> msgs, Acknowledgment ack) {
        msgs.forEach(p -> dealMsg(p, session));
        ack.acknowledge();
    }

      private void dealMsg(ConsumerRecord<String, GenericRecord> msg) {
        Object o;
        String opt = msg.value().get(operate).toString();
        String snapshot = "r";
        if (insert.equals(opt) || update.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        } else if (delete.equals(opt)) {
            o = JSON.parseObject(msg.value().get("before").toString(), KafkaConfig.config.get(msg.topic()));
           //todo
        } else if (snapshot.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        }
    }


}

4.定义kafka-topic与实体类映射文件kafka-topic-mapping.properties
内容

xxx-topic1=类1全路径
xxx-topic2=类2全路径
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,064评论 19 139
  • 1. 概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特...
    EmmaQin阅读 11,185评论 0 1
  • 之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...
    非典型_程序员阅读 8,102评论 0 3
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 12,748评论 28 53
  • 信任包括信任自己和信任他人 很多时候,很多事情,失败、遗憾、错过,源于不自信,不信任他人 觉得自己做不成,别人做不...
    吴氵晃阅读 11,356评论 4 8

友情链接更多精彩内容