pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
-
生产者(发布消息)
application.yml文件
server:
port: 8091
spring:
application:
name: producer
kafka:
# kafka 配置
bootstrap-servers: 192.168.75.149:9092 # 本地利用centOs7搭建kafka的地址
producer:
acks: all
client-id: producer-demo
retries: 0
# 每次批量发送消息的数量
batch-size: 16384
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
KafkaConfig
/**
* 启动默认创建topic,分区,副本数量,topic存在则自动忽略
* @author wushiyi
**/
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public NewTopic initNewTopic(){
return new NewTopic("userInfo",1,(short) 1);
}
}
发布消息
/**
* @author wushiyi
**/
@Service
@Slf4j
public class UserService {
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
/**
* 发送消息
*/
public void send() {
User user = new User("ZhangSan","男","12345678901");
log.info("向kafka推送用户信息:{}",gson.toJson(user));
this.kafkaTemplate.send("userInfo",gson.toJson(user));
}
}
调用接口控制台打印
-
消费者(订阅消息)
application.yml文件
server:
port: 8092
spring:
application:
name: consumer
kafka:
# kafka配置
bootstrap-servers: 192.168.75.149:9092
consumer:
# 指定默认消费者group id
group-id: test-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 5000
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
消费
/**
* @author wushiyi
**/
@Slf4j
@Component
public class UserConsumer {
private Gson gson = new GsonBuilder().create();
@KafkaListener(topics = "userInfo")
public void userConsumer(String message) {
log.info("receive msg {}" , message);
}
}
订阅结果
-
补充
producer启动出现以下报错检查防火墙是否开启,把防火墙关闭即可