引言
在微服务架构大行其道的今天,消息队列作为解耦、异步处理和流量削峰的利器,已成为系统中不可或缺的组件。Apache RocketMQ作为阿里开源的一款分布式、队列模型的消息中间件,以其低延迟、高可靠、万亿级消息容量和灵活的扩展性,受到了广大开发者的青睐。
本文将带你从零开始,学习如何在Spring Boot项目中集成和使用RocketMQ,并深入探讨其集群模式与核心消息机制,助你从入门走向精通。
第一部分:入门篇 - Spring Boot快速集成RocketMQ
目标: 在Spring Boot中成功发送和接收第一个消息。
1. 环境准备
- 一个RocketMQ服务。你可以从官网下载,在本地启动NameServer和Broker。最简单的方式是使用Docker:
# 启动NameServer docker run -d --name rmqnamesrv -p 9876:9876 rocketmqinc/rocketmq sh mqnamesrv # 启动Broker docker run -d --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -p 10911:10911 -p 10909:10909 rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.9.4/conf/broker.conf - 一个Spring Boot项目(推荐使用Spring Initializr创建)。
2. 添加依赖
在项目的pom.xml中添加RocketMQ Spring Boot Starter依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version> <!-- 请使用最新版本 -->
</dependency>
3. 配置连接信息
在application.yml中配置RocketMQ NameServer的地址。
rocketmq:
name-server: 127.0.0.1:9876 # NameServer地址
producer:
group: my-producer-group # 生产者组名,必填
4. 编写生产者发送消息
使用RocketMQTemplate可以非常方便地发送消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送简单消息
* @param topic 主题
* @param message 消息内容
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
System.out.println("发送消息: " + message);
}
}
5. 编写消费者接收消息
使用@RocketMQMessageListener注解来监听指定主题的消息。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "TestTopic", // 监听的Topic
consumerGroup = "my-consumer-group" // 消费者组名
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理收到的消息
System.out.println("接收到消息: " + message);
}
}
6. 测试
编写一个Controller或单元测试,调用MessageProducer的sendMessage方法。如果控制台打印出“发送消息”和“接收到消息”,恭喜你,第一步成功了!
第二部分:进阶篇 - 详解RocketMQ消息机制
1. 消息类型
-
同步消息: 发送后等待Broker返回确认,适用于重要通知、短信等场景。
SendResult sendResult = rocketMQTemplate.syncSend("SyncTopic", "这是一条同步消息"); System.out.println("发送状态: " + sendResult.getSendStatus()); -
异步消息: 发送后立即返回,通过回调函数处理Broker的响应,适用于链路耗时长但对响应时间敏感的场景。
rocketMQTemplate.asyncSend("AsyncTopic", "这是一条异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("异步发送成功"); } @Override public void onException(Throwable e) { System.out.println("异步发送失败: " + e.getMessage()); } }); -
单向消息: 只负责发送,不等待响应,也不关心结果,适用于日志收集等可靠性要求不高的场景。
rocketMQTemplate.sendOneWay("OneWayTopic", "这是一条单向消息"); -
顺序消息: 保证消息在同一个消息队列(MessageQueue)内被顺序消费。通过选择器(
MessageQueueSelector)将同一业务ID的消息发送到同一个队列。// 发送顺序消息,将orderId相同的消息发到同一个队列 rocketMQTemplate.syncSendOrderly("OrderTopic", message, orderId);消费者需要实现
RocketMQListener并保证消费逻辑是幂等的。 -
延迟消息: 消息发送后不会立即被消费,而是在指定的延迟时间后才会投递给消费者。
Message<String> message = MessageBuilder.withPayload("这是一条延迟消息").build(); // “2”代表延迟级别为2,即延迟10秒。RocketMQ预设了1s, 5s, 10s, 30s, 1m等级别。 rocketMQTemplate.syncSend("DelayTopic", message, 3000, 2); 事务消息: 用于解决分布式事务问题,保证本地事务和消息发送的最终一致性。实现过程较为复杂,需要实现
RocketMQLocalTransactionListener接口。
2. 消息过滤
-
Tag过滤: 最常用的方式。生产者可以为消息设置Tag,消费者可以只订阅感兴趣Tag的消息。
- 生产者:
rocketMQTemplate.syncSend("TopicA:TagA", "带Tag的消息"); - 消费者:
@RocketMQMessageListener(topic = "TopicA", consumerGroup = "cg1", selectorExpression = "TagA")
- 生产者:
- SQL92过滤: 通过消息的UserProperty属性,使用SQL表达式进行过滤,功能更强大,但性能开销比Tag大。
第三部分:精通篇 - RocketMQ集群模式解析
单机模式无法满足生产环境的高可用要求,因此必须采用集群部署。
1. 集群架构核心角色
- NameServer集群: 无状态节点,负责服务发现和路由管理。Broker会向所有NameServer注册,Producer/Consumer从NameServer获取路由信息。集群间节点互不通信,通过多个节点来实现高可用。
-
Broker集群: 负责消息的存储、投递和查询。是RocketMQ的核心。
- Master: 提供读写服务。
- Slave: 仅提供读服务,作为Master的热备,在Master宕机后可以转换为Master。
2. 集群部署模式
-
多Master模式(无Slave):
- 优点: 配置简单,性能最高。
- 缺点: 如果某个Master宕机,该机器上的消息在恢复前无法订阅,可靠性不高。
-
多Master多Slave模式(异步复制):
- 优点: 即使一台Master宕机,Slave仍然可以提供读服务,且消息几乎无延迟。性能和可用性兼顾。
- 缺点: 主备有短暂延迟,Master宕机后可能有少量消息丢失。
-
多Master多Slave模式(同步双写):
- 优点: 主备都写成功后才返回成功,数据可靠性极高,消息无丢失。
- 缺点: 性能比异步复制低,目前主版本已不再推荐使用。
生产环境推荐使用 多Master多Slave(异步复制) 模式。
3. Spring Boot连接集群
在Spring Boot中连接集群非常简单,只需在配置中指定多个NameServer地址即可。
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876 # 用分号隔开多个NameServer
producer:
group: my-producer-group
当某个NameServer不可用时,客户端会自动切换到其他可用的NameServer,从而实现高可用。
总结
通过本文的学习,我们完成了RocketMQ在Spring Boot中的三部曲:
- 入门: 学会了如何引入依赖、配置、并发送接收最简单的消息。
- 进阶: 深入了解了同步、异步、顺序、延迟等多种消息类型及其适用场景,并掌握了Tag过滤的基本用法。
- 精通: 剖析了RocketMQ高可用集群的核心架构与部署模式,理解了NameServer和Broker的分工,并学会了如何在Spring Boot中配置连接集群。
RocketMQ功能强大且复杂,本文仅涵盖了其核心部分。要真正达到“精通”,还需要在实践中不断探索,例如深入理解其存储机制、消息重试、死信队列等高级特性。希望这篇文章能为你打开RocketMQ的大门,助你在分布式系统开发中游刃有余。