Spring Kafka 教程 – spring读取和发送kakfa消息

本文代码格式不好调整,可以参考本人在其他地方的同篇博文 https://blog.csdn.net/russle/article/details/80296006

Apache Kafka, 分布式消息系统,

非常流行。Spring是非常流行的Java快速开发框架。将两者无缝平滑结合起来可以快速实现很多功能。本文件简要介绍Spring

Kafka,如何使用 KafkaTemplate发送消息到kafka的broker上, 如何使用“listener

container“接收Kafka消息。

1,Spring Kafka的组成

这一节我们首先介绍Spring Kafka的各个组成部分。

1.1  发送消息

与 JmsTemplate 或者JdbcTemplate类似,Spring Kafka提供了 KafkaTemplate.  该模板封装了Kafka消息生产者并提供各种消息发送方法。

消息发送的各种方法。

```

ListenableFuture> send(Stringtopic, V data);ListenableFuture> send(Stringtopic,Kkey, V data);ListenableFuture> send(Stringtopic, int partition, V data);ListenableFuture> send(Stringtopic, int partition,Kkey, V data);ListenableFuture> send(Message message);

```

1.2 接收消息

要接收消息,我们需要配置MessageListenerContainer并提供一个Message Listener,或者使用 @KafkaListener注解。

MessageListenserContainer

MessageListenserContainer 有以下两个实现类:

KafkaMessageListenerContainer

ConcurrentMessageListenerContainer

KafkaMessageListenerContainer可以让我们使用单线程消费Kafka topic的消息,而ConcurrentMessageListenerContainer 可以让我们多线程消费消息。

@KafkaListener 注解

Spring Kafka提供的 @KafkaListener注解,可以让我们监听某个topic或者topicPattern的消息。

监听符合topicPattern = “topic.*”的所有topic的消息

```

@Component@Slf4jpublicclassCmdReceiver {    @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {        Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            log.info("----------------- record =topic:"+ topic+", "+ record);            log.info("------------------ message =topic:"+ topic+", "+ message);        }    }}

```


监听某个topic的消息publicclassListener {    @KafkaListener(id ="id01", topics ="Topic1")publicvoidlisten(String data) {    }}

2, Spring Kafka 例子

下面我们介绍一个具体的例子, 这个例会发送和接收指定topic的消息。

准备工作

kafka_2.11-1.1.0.tgz和zookeeper-3.4.10.tar.gz

JDK jdk-8u171-linux-x64.tar.gz

IDE (Eclipse or IntelliJ)

Build tool (Maven  or Gradle)

本文不涉及安装Kafka的介绍,请自行搜索,或者看官方文档。

pom文件

也就是我们的依赖包. 这是笔者使用的依赖版本,仅供参考。

4.0.0com.yqkafkademo1.0-SNAPSHOTorg.springframework.bootspring-boot-starter-parent1.5.12.RELEASEUTF-8UTF-81.8org.springframework.bootspring-boot-starter-weborg.projectlomboklomboktrueorg.springframework.bootspring-boot-starter-testtestorg.springframework.kafkaspring-kafka1.1.8.RELEASEcom.google.code.gsongson2.8.2org.apache.kafkakafka-clients0.10.1.1io.springfoxspringfox-swagger22.7.0io.springfoxspringfox-swagger-ui2.7.0io.springfoxspringfox-spring-web2.7.0com.alibabafastjson1.1.33org.springframework.bootspring-boot-maven-plugin

*KafkaDemoApplication*

我们使用springboot的框架,这是我们程序的入口点。

@SpringBootApplicationpublicclassKafkaDemoApplication{privatestaticfinalLogger logger = LoggerFactory.getLogger(KafkaDemoApplication.class);publicstaticvoidmain(String[] args) {        ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);        logger.info("Done start Spring boot");    }}

ProducerConfig

其实我们可以可以不用编写KafkaProducerConfig,直接使用KafkaTemplate(当然前提是我们要设置好producer需要的配置项,例如spring.kafka.bootstrap-servers, spring.kafka.producer.key-serializer, spring.kafka.producer.retries等等)

@Configuration@EnableKafkapublicclassKafkaProducerConfig{@BeanpublicProducerFactoryproducerFactory() {returnnewDefaultKafkaProducerFactory<>(producerConfigs());    }@BeanpublicMapproducerConfigs() {        Map props =newHashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根据实际情况修改)");        props.put(ProducerConfig.RETRIES_CONFIG,0);        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);        props.put(ProducerConfig.LINGER_MS_CONFIG,1);        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);returnprops;    }@BeanpublicKafkaTemplatekafkaTemplate() {returnnewKafkaTemplate(producerFactory());    }}

KafkaConsumerConfig

同理,其实我们可以可以不用编写KafkaConsumerConfig,直接使用 @KafkaListener(当然前提是我们要设置好consumer需要的配置项,例如spring.kafka.bootstrap-servers, spring.kafka.consumer.key-deserializer, spring.kafka.consumer.group-id、spring.kafka.consumer.auto-offset-reset等等)

@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@BeanKafkaListenerContainerFactory> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory factory =newConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(3);        factory.getContainerProperties().setPollTimeout(3000);returnfactory;    }@BeanpublicConsumerFactoryconsumerFactory() {returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());    }@BeanpublicMapconsumerConfigs() {        Map propsMap =newHashMap<>();        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092(根据实际情况修改)");        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100");        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"15000");        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");returnpropsMap;    }@BeanpublicMyListenerlistener() {returnnewMyListener();    }}

定义了ProducerConfig和ConsumerConfig后我们需要实现具体的生产者和消费者。

本文的KafkaListenerContainerFactory 中使用了ConcurrentKafkaListenerContainer, 我们将使用多线程消费消息。

注意消息代理的地址是localhost:9092,

需要根据实际情况修改。需要特别注意的是,我在windows运行程序,kafka在我的linux虚拟机,

我需要配置windows的hosts文件,配置虚拟机hostname和ip的映射,例如192.168.119.131       

ubuntu01

开发Listener

我们来开发自己的Listener监听具体的topic, 这里例子中我们监听以topic开头的主题,不做其他业务,只是打印出来。

@Component@Slf4jpublicclassMyListener{    @KafkaListener(topicPattern ="topic.*")publicvoidlisten(ConsumerRecord record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {        Optional kafkaMessage = Optional.ofNullable(record.value());if(kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            log.info("------------------ message =topic:"+ topic+", "+ message);        }    }}

开发producer

我在程序中增加了controller,这样我们可以通过controller给topic发消息。consumer一直在监听,只要有消息发送过去,就会打印出来。controller中调用了ProducerServiceImpl , 具体代码比较简单就不再罗列。

我们producerServiceImpl主要是有这句, 通过KafkaTemplate 发送消息。

@Autowired

private KafkaTemplate template;

@ServicepublicclassProducerServiceImplimplementsProducerService{privatestaticfinalLogger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);privateGson gson =newGsonBuilder().create();@AutowiredprivateKafkaTemplate template;//发送消息方法publicvoidsendJson(String topic, String json) {        JSONObject jsonObj = JSON.parseObject(json);        jsonObj.put("topic", topic);        jsonObj.put("ts", System.currentTimeMillis() +"");        logger.info("json+++++++++++++++++++++  message = {}", jsonObj.toJSONString());        ListenableFuture> future = template.send(topic, jsonObj.toJSONString());        future.addCallback(newListenableFutureCallback>() {@OverridepublicvoidonSuccess(SendResult result) {                System.out.println("msg OK."+ result.toString());            }@OverridepublicvoidonFailure(Throwable ex) {                System.out.println("msg send failed: ");            }        });    }

运行程序

运行第一步,确保Kafka broker配置正确,笔者的程序在Windows10机器上,Kafka在虚拟机上,因为我的地址是192.168.119.129:9092, 而不是localhost:9092.

运行第二步骤,在IDEA中选中KafkaDemoApplication ,  单击鼠标右键,选择 Run KafkaDemoApplication

效果图

kafka段命令行接收到的消息

3,总结

Spring Kafka提供了很好的集成,我们只需配置properties文件,就可以直接使用KafkaTemplate发送消息,使用@KafkaListener监听消息。

参考文档:

https://docs.spring.io/spring-kafka/docs/2.1.6.RELEASE/reference/html/_reference.html#kafka-template

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,812评论 4 54
  • 一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独...
    ITsupuerlady阅读 1,627评论 0 9
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,572评论 2 39
  • 真正的读书,须得你捧起完整漫长的字句,心无旁骛地走进作者设定的世界里,与千年前的古人对话,观别人的不可思议人生,去...
    旧城梦境exo阅读 224评论 0 0