Spring-KafkaListener使用

1.导包

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.1</version>
        </dependency>

2.Kafka配置类

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lvye.java.datacenter.mq.KafkaConsumer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;

import java.io.IOException;
import java.util.*;


@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;
    @Value("${kafka.session.timeout.ms}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.auto.commit.interval.ms}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.group.id}")
    private String groupId;
    @Value("${kafka.avro.schema.registry.url}")
    private String schemaRegistryUrl;
    @Value("${kafka.max_poll_record}")
    private String  maxPollRecord;

    public static Map<String, Class<?>> config ;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(null);
        factory.setBatchListener(true);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(30000);
        factory.setBatchErrorHandler(batchErrorHandler);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean(name="topicConfigs")
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 6);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("schema.registry.url", schemaRegistryUrl);
        return props;
    }


    @Bean
    public static HashMap<String,Class<?>> loadTopic () {
        Properties propertie = null;
        HashMap<String, Class<?>> topics = new HashMap<>();
        try {
            propertie = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        Set< Map.Entry<Object,Object>> kvs = propertie.entrySet();
        try {
            for(Map.Entry<Object,Object> kv:kvs){
                topics.put((String)kv.getKey(), Class.forName((String)kv.getValue()));
            }
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        config = topics;
        return topics;
    };

    @Bean
    public KafkaConsumer consumer() {
        return new KafkaConsumer();
    }


}

3.配置文件(application.properties)

kafka.bootstrap.servers=121.102.172.42:9092,121.102.172.42:9092,121.102.172.43:9092
kafka.avro.schema.registry.url=http://121.102.218.111:8081
kafka.session.timeout.ms=300000
kafka.enable.auto.commit=false
kafka.auto.commit.interval.ms=60000
kafka.auto.offset.reset=earliest
kafka.max_poll_record=1000
kafka.group.id=DmSync

3.定义Consumer类

@Component("DcKafkaConsumer")
public class KafkaConsumer {
 
   public String[] getTopics() {
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        return properties.keySet().toArray(new String[properties.keySet().size()]);
    }

   @KafkaListener(topics = "#{DcKafkaConsumer.getTopics()}")
    public void listen(List<ConsumerRecord<String, GenericRecord>> msgs, Acknowledgment ack) {
        msgs.forEach(p -> dealMsg(p, session));
        ack.acknowledge();
    }

      private void dealMsg(ConsumerRecord<String, GenericRecord> msg) {
        Object o;
        String opt = msg.value().get(operate).toString();
        String snapshot = "r";
        if (insert.equals(opt) || update.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        } else if (delete.equals(opt)) {
            o = JSON.parseObject(msg.value().get("before").toString(), KafkaConfig.config.get(msg.topic()));
           //todo
        } else if (snapshot.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        }
    }


}

4.定义kafka-topic与实体类映射文件kafka-topic-mapping.properties
内容

xxx-topic1=类1全路径
xxx-topic2=类2全路径
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,941评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,397评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,345评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,851评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,868评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,688评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,414评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,319评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,775评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,945评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,096评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,789评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,437评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,993评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,107评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,308评论 3 372
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,037评论 2 355

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,659评论 18 139
  • 1. 概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特...
    EmmaQin阅读 5,719评论 0 1
  • 之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习...
    非典型_程序员阅读 2,972评论 0 3
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,536评论 28 53
  • 信任包括信任自己和信任他人 很多时候,很多事情,失败、遗憾、错过,源于不自信,不信任他人 觉得自己做不成,别人做不...
    吴氵晃阅读 6,190评论 4 8