1 资源
资源信息 | 版本号 | 备注 |
---|---|---|
rocketMQ | 4.8.0 | IP: 192.168.51.4 |
springboot | 2.4.0 |
springboot-rocketmq-demo 源码 下载
2 rocketmq安装
需要安装rocketmq,如果未安装,可参考博文:
3 springboot整合
3.1 pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
<relativePath/>
</parent>
<groupId>com.auskat.demo.rocketmq</groupId>
<artifactId>springboot-rocketmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 移除了Junit4,引入JUnit5 单元测试 -->
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- springboot 整合的rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- rocketmq client 端 -->
<!-- <dependency>-->
<!-- <groupId>org.apache.rocketmq</groupId>-->
<!-- <artifactId>rocketmq-client</artifactId>-->
<!-- <version>4.3.0</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.2 配置信息
3.2.1 application.yml
# rocketmq 相关配置信息
rocketmq:
nameServer: 192.168.51.4:9876
# 生产者配置
producer:
group: boot-group-test
maxMessageSize: 4096 # 消息最大长度 默认1024*4(4M)
sendMessageTimeout: 3000 # 发送消息超时时间,默认3000
retryTimesWhenSendFailed: 2 # 发送消息失败重试次数,默认2
# 消费者配置
consumers:
topic: boot-topic-test
listeners:
registeredListener:
tag: registered
group: consumer_registered
modifyListener:
tag: modify
group: consumer_modify
3.2.2 配置类
package com.auskat.demo.rocketmq.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 类文件: ConsumerConfiguration
* <p>
* <p>
* 类描述:配置类
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 13:44
* <p>
*/
@Component
@ConfigurationProperties(prefix = "consumers")
public class ConsumerConfiguration {
private String topic;
public static class listeners {
public static class registeredListener {
private String tag;
private String group;
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
}
public static class modifyListener {
private String tag;
private String group;
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
3.3 功能实现
3.3.1 messge对象
package com.auskat.demo.rocketmq.entity;
import java.io.Serializable;
/**
* 类文件: Message
* <p>
* <p>
* 类描述:消息对象
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 10:40
* <p>
*/
public class Message implements Serializable {
private static final long serialVersionUID = 4767442453746333787L;
/**
* 消息主题 可以理解为 一级分类
*/
private String topic;
/**
* 消息标签 可以理解为 二级分类
*/
private String tags;
/**
* 消息内容
*/
private String content;
@Override
public String toString() {
return "Message{" +
"topic='" + topic + '\'' +
", tags='" + tags + '\'' +
", content='" + content + '\'' +
'}';
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
3.3.2 消息生产
接口类
package com.auskat.demo.rocketmq.service;
import com.auskat.demo.rocketmq.entity.Message;
import java.util.List;
/**
* 类文件: RocketMQService
* <p>
* <p>
* 类描述:RocketMQ业务接口层
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 10:48
* <p>
*/
public interface RocketMQService {
/**
* 同步发送消息
*
* 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
*
* @param message 发送消息实体类
*/
void send(Message message);
/**
* 异步发送消息,异步返回消息结果
*
* 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
*
* @param message 发送消息实体类
*/
void asyncSend(Message message);
/**
* 直接发送发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
*
* 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
*
* @param message 发送消息实体类
*/
void syncSendOrderly(Message message);
/**
* 发送延迟消息
* 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存
* 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
* 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关
*
* @param message 发送消息实体类
*/
void delaySend(Message message);
/**
* 批量发送消息
* 批量发送消息能显著提高传递小消息的性能。
* 限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。
* 此外,这一批消息的总大小不应超过4MB。
* @param messages 消息集合
*/
void batchSend(List<Message> messages);
}
接口实现类
package com.auskat.demo.rocketmq.service.impl;
import com.auskat.demo.rocketmq.common.callback.CustomSendCallBack;
import com.auskat.demo.rocketmq.entity.Message;
import com.auskat.demo.rocketmq.service.RocketMQService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 类文件: RocketMQServiceImpl
* <p>
* <p>
* 类描述:RocketMQ 业务实现层
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 11:01
* <p>
*/
@Service
public class RocketMQServiceImpl implements RocketMQService {
private static final Logger logger = LoggerFactory.getLogger(RocketMQServiceImpl.class);
/**
* rocketMQ 模板操作类注入
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送消息
* <p>
* 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
*
* @param message 发送消息实体类
*/
@Override
public void send(Message message) {
logger.info("send sync message to rocketmq {}",message);
rocketMQTemplate.send(message.getTopic() + ":" + message.getTags(), MessageBuilder.withPayload(message.getContent()).build());
}
/**
* 异步发送消息,异步返回消息结果
* <p>
* 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
*
* @param message 发送消息实体类
*/
@Override
public void asyncSend(Message message) {
logger.info("send async message to rocketmq {}",message);
rocketMQTemplate.asyncSend(message.getTopic() + ":" + message.getTags(), message.getContent(), new CustomSendCallBack(message));
}
/**
* 直接发送发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;
* <p>
* 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
*
* @param message 发送消息实体类
*/
@Override
public void syncSendOrderly(Message message) {
logger.info("send one way message to rocketmq {}",message);
rocketMQTemplate.sendOneWay(message.getTopic() + ":" + message.getTags(), message.getContent());
}
/**
* 发送延迟消息
* 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存
* 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
* 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关
*
* @param message 发送消息实体类
*/
@Override
public void delaySend(Message message) {
}
/**
* 批量发送消息
* 批量发送消息能显著提高传递小消息的性能。
* 限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。
* 此外,这一批消息的总大小不应超过4MB。
*
* @param messages 消息集合
*/
@Override
public void batchSend(List<Message> messages) {
}
}
3.3.3 消息监听
注册监听
package com.auskat.demo.rocketmq.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* 类文件: MqRegisteredListener
* <p>
* <p>
* 类描述:注册tag监听类
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 13:24
* <p>
*/
@Service
@RocketMQMessageListener(topic = "${consumers.topic}"
, consumerGroup = "${consumers.listeners.registeredListener.group}"
, selectorExpression = "${consumers.listeners.registeredListener.tag}")
public class MqRegisteredListener implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(MqRegisteredListener.class);
@Override
public void onMessage(String message) {
log.info("received registered message: {}", message);
}
}
修改监听
package com.auskat.demo.rocketmq.listener;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* 类文件: MqModifyListener
* <p>
* <p>
* 类描述:修改tag监听类
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 13:24
* <p>
*/
@Service
@RocketMQMessageListener(topic = "${consumers.topic}"
, consumerGroup = "${consumers.listeners.modifyListener.group}"
, selectorExpression = "${consumers.listeners.modifyListener.tag}")
public class MqModifyListener implements RocketMQListener<String> {
private static final Logger log = LoggerFactory.getLogger(MqModifyListener.class);
@Override
public void onMessage(String message) {
log.info("received modify message: {}", message);
}
}
3.3.4 自定义回调
package com.auskat.demo.rocketmq.common.callback;
import com.auskat.demo.rocketmq.entity.Message;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 类文件: SendCallBack
* <p>
* <p>
* 类描述:
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 11:35
* <p>
*/
public class CustomSendCallBack implements SendCallback {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(CustomSendCallBack.class);
private final Message message;
public CustomSendCallBack(Message message) {
this.message = message;
}
/**
* 消息发送成功 调用方法
* @param sendResult 发送成功返回的结果信息
*/
@Override
public void onSuccess(SendResult sendResult) {
logger.info("send async message to rocketmq success,message: {}",message);
}
/**
* 消息发送失败 调用方法
* @param throwable 异常信息
*/
@Override
public void onException(Throwable throwable) {
logger.error("send async message to rocketmq fail,message: {},exception: {}",message,throwable.getMessage());
}
}
4 功能测试
package com.auskat.demo.rocketmq;
import com.auskat.demo.rocketmq.entity.Message;
import com.auskat.demo.rocketmq.service.RocketMQService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
/**
* 类文件: ApplicationTest
* <p>
* <p>
* 类描述:
* <p>
* 作 者: AusKa_T
* <p>
* 日 期: 2021/4/3 0003
* <p>
* 时 间: 13:54
* <p>
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {
@Autowired
private RocketMQService rocketMQService;
@Test
public void testSendMessage() {
Message message = new Message();
message.setTopic("boot-topic-test");
// 第一个发送注册消息
message.setTags("registered");
// 此处可为其他VO对象,替换掉Map
Map<String, String> userInfo = new HashMap<>();
userInfo.put("username", "zhangsan");
userInfo.put("age", "12");
// 此处可封装为json等格式
message.setContent(userInfo.toString());
rocketMQService.asyncSend(message);
// 第二个发送修改信息
message.setTags("modify");
userInfo.put("age", "18");
// 此处可封装为json等格式
message.setContent(userInfo.toString());
// 发送修改消息
rocketMQService.asyncSend(message);
}
}
5 相关信息
- 博文不易,辛苦各位猿友点个关注和赞,感谢