SpringBoot整合Kafka实现发布订阅
新建SpringBoot项目
基于JDK版本1.8,SpringBoota版本1.5.9.RELEASE
1、pom.xml中添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sunlong</groupId>
<artifactId>spring-boot-kafka-demo</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>spring-boot-kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、配置application.yml文件
spring:
kafka: # 指定kafka 代理地址,可以多个
bootstrap-servers: http://kafkahost:9092
consumer: # 指定默认消费者group id
group-id: myGroup
template: # 指定默认topic id
default-topic: tsc_dsc_newMsg
listener: # 指定listener 容器中的线程数,用于提高并发量
concurrency: 5
producer: # 每次批量发送消息的数量
batch-size: 1000
server:
port: 8888
3、模拟生产者Producer
package com.sunlong.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/*
* kafkaDemo
*
* @Author 孙龙
* @Date 2018/1/19
*/
@RestController
public class SampleController {
@Autowired
private KafkaTemplate<String, String> template;
@GetMapping("/send")
String send(String topic, String key, String data) {
template.send(topic, key, data);
return "success";
}
}
4、消息监听Consumer
可以建议多个消费者
package com.sunlong.listenner;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/*
* kafkaDemo
*
* @Author 孙龙
* @Date 2018/1/19
*/
@Component
public class Listenner {
@KafkaListener(topics = "topic1")
public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
System.out.println("listenT1收到消息!! topic:>>> " + cr.topic() + " key:>> " + cr.key() + " value:>> " + cr.value());
}
@KafkaListener(topics = "topic2")
public void listenT2(ConsumerRecord<?, ?> cr) throws Exception {
System.out.println("listenT2收到消息!! topic:>>> " + cr.topic() + " key:>> " + cr.key() + " value:>> " + cr.value());
}
}
5、测试
5.1 启动项目
5.2 打开浏览器输入
http://localhost:8888/send?topic=topic1&key=msg&data=testmessage
5.3 可以看到控制台打印
listenT1收到消息!! topic:>>> topic1key:>> msgvalue:>> testmessage