一、整体架构和核心概念
作用:异步、消峰、解耦
二、使用springboot收发消息(Demo)
1. 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
2. 配置rabbitmq
- 创建虚拟主机(Virtual host):
/wlhost
- 创建队列(queue):
wl.queue
- 将
wl.queue
绑定在/wlhost
上(用可以控制此虚拟主机的用户创建该队列即可)
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
3. 发送消息代码
@SpringBootTest
public class SpringBootAmqpTest {
@Resource
private AmqpTemplate amqpTemplate;
@Test
void testSendMsg() {
amqpTemplate.convertAndSend("wl.queue", "hello amqp!");
}
}
4. 接收消息代码
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "wl.queue")
public void receivedMsg(String msg) {
log.info("收到了消息:{}", msg);
}
}
三、work模型(能者多劳)
1. 配置prefetch(处理完成才会接收消息,未处理完不会接收消息)
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
prefetch: 1
2. 发送消息代码
@Test
void testSendWorkMsg() {
for (int i = 1; i <= 50; i++) {
amqpTemplate.convertAndSend("work.queue", "hello worker, message_" + i);
}
}
3. 接收消息代码
@RabbitListener(queues = "work.queue")
public void worker1ReceivedMsg(String msg) throws InterruptedException {
log.info("收到了消息:{}", msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void worker2ReceivedMsg(String msg) throws InterruptedException {
log.warn("收到了消息:{}", msg);
Thread.sleep(200);
}
四、四种交换机
4.1 fanout交换机
fanout exchange
会将收到的消息广播到每一个跟其绑定的queue
,因此也叫广播模式
所有的微服务以及对应的集群节点都会收到fanout exchange
交换机跟其绑定的queue
的消息
1. 配置fanout exchange
添加
wl.fanout
交换机,并为其添加两个队列fanout.queue1
,fanout.queue2
2. 发送消息代码
@Test
void fanoutSendMsg() {
String exchangeName = "wl.fanout";
amqpTemplate.convertAndSend(exchangeName, "", "hello everyone!");
}
3. 接收消息代码
@RabbitListener(queues = "fanout.queue1")
public void fanout1ReceivedMsg(String msg) {
log.info("消费者1收到了消息:{}", msg);
}
@RabbitListener(queues = "fanout.queue2")
public void fanout2ReceivedMsg(String msg) {
log.info("消费者2收到了消息:{}", msg);
}
4.2 direct交换机
direct exchange
会将收到的消息根据路由规则到指定queue
,因此也叫定向路由
- 每一个
queue
都与Exchange
设置一个BindingKey
- 发布者发送消息时,指定消息的
BindingKey
Exchange
将消息路由到BindingKey
与消息BindingKey
一致的队列若所有的
BindingKey
均一致,direct exchange
也能实现fanout exchange
的广播效果
1. 配置direct exchange
添加
wl.direct
交换机,并为其添加两个队列direct.queue1
,direct.queue2
2. 发送消息代码
@Test
void directSendMsg() {
String exchangeName = "wl.direct";
amqpTemplate.convertAndSend(exchangeName, "red", "hello red!");
}
3. 接收消息代码
@RabbitListener(queues = "direct.queue1")
public void direct1ReceivedMsg(String msg) {
log.info("消费者1可以收到routingKey为blue和red的消息:{}", msg);
}
@RabbitListener(queues = "direct.queue2")
public void direct2ReceivedMsg(String msg) {
log.info("消费者2可以收到routingKey为yellow和red的消息:{}", msg);
}
4. 将发送消息
routingKey
改为yellow
@Test
void directSendMsg() {
String exchangeName = "wl.direct";
amqpTemplate.convertAndSend(exchangeName, "yellow", "hello red!");
}
4.3 topic交换机(推荐使用)
topic exchange
与direct exchange
类似,区别咋子与routingKey
可以使多个单词的列表,并以.
分割
queue
与exchange
指定routingKey
时可以使用通配符
- “#”:代指0个或多个单词
- “*”:代指1个单词
1. 配置topic exchange
添加
wl.topic
交换机,并为其添加两个队列topic.queue1
,topic.queue2
2. 发送消息代码
@Test
void topicSendMsg() {
String exchangeName = "wl.topic";
amqpTemplate.convertAndSend(exchangeName, "china.weather", "这是中国天气消息!");
}
3. 接收消息代码
@RabbitListener(queues = "topic.queue1")
public void topic1ReceivedMsg(String msg) {
log.info("消费者1可以收到routingKey含weather的消息:{}", msg);
}
@RabbitListener(queues = "topic.queue2")
public void topic2ReceivedMsg(String msg) {
log.info("消费者2可以收到routingKey含china的消息:{}", msg);
}
4.4 headers交换机
不依赖路由键匹配规则路由消息。是根据发送消息内容中的headers属性进行匹配。性能差,基本用不到
五、创建(声明)队列和交换机
1. 使用注解创建(推荐使用)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void direct1ReceivedMsg(String msg) {
log.info("消费者1可以收到routingKey为blue和red的消息:{}", msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void direct2ReceivedMsg(String msg) {
log.info("消费者2可以收到routingKey为yellow和red的消息:{}", msg);
}
2. 使用Bean注入创建
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public FanoutExchange fanoutExchange() {
// ExchangeBuilder.fanoutExchange("wl.fanout").build();
return new FanoutExchange("wl.fanout");
}
@Bean
public Queue queue() {
// return QueueBuilder.durable("fanout.queue1").build();
return new Queue("fanout.queue1");
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(fanoutExchange());
}
}
六、消息转换器
发送消息和接收消息方都需要
引入依赖
,注入消息转换器
否则会使用jdk
自带的序列化方式ObjectOutputStream
1. 引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2. 注入消息转换器
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringbootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3. 发送消息代码
@Test
void testSendObjectMsg() {
String exchangeName = "wl.object.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("张三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
amqpTemplate.convertAndSend(exchangeName, "", student);
}
4. 接收消息代码
@RabbitListener(queues = "wl.object.queue")
public void objectReceivedMsg(Map<String, String> map) {
log.warn("收到了消息:{}", map);
}
七、消息可靠性问题
7.1 生产者重连(rabbitmq服务器宕机,重连)
解决消息发送给服务器是丢失问题
当网络不稳定时,利用重试机制可以提高消息发送的成功率,不过SpirngAMQP的重试机制是
阻塞式
的,也就是多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能如果对于 业务性能有要求,建议
禁用
重试机制。如一定要用,请设置合理的等待时长
和重试次数
,当然也可以考虑用异步
线程来执行发送消息的代码。
rabbitmq:
connection-timeout: 1s #设置mq的连接超时时间
template:
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
7.2 生产者确认机制
rabbitMQ有
publisher return
和publisher confirm
两种确认机制。开启这两种确认及之后,MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
exchange
存在,routingKey
不存在,MQ返回ack,但会通过publisher return
返回路由异常,告知投递成功exchange
、routingKey
均存在,临时消息入队成功,返回ack
,告知投递成功exchange
、routingKey
均存在,持久消息入队成功且完成持久化,返回ack
,告知投递成功- 其他情况都会返回
nack
,告知投递失败
publisher-returns:默认为false,不开启publisher return机制
publisher-confirm-type:默认为none,publisher confirm机制
- none:关闭confirm
- simple:同步阻塞等待mq的回执消息
- correlated:异步回调返回mq的回执消息
1. 配置生产者确认机制
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
connection-timeout: 1s #设置mq的连接超时时间
publisher-returns: true #开启publisher return机制
#none:关闭confirm,simple:同步阻塞等待mq的回执消息,correlated:异步回调返回mq的回执消息
publisher-confirm-type: correlated #publisher confirm机制;
2. 设置publisher return
@Configuration
@Slf4j
public class MqConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public RabbitTemplate rabbitTemplate() {
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("收到消息的return callback,exchange:{},routingKey:{},msg:{},replyCode:{},replyText:{}",
returnedMessage.getMessage(),
returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),
returnedMessage.getReplyCode(),
returnedMessage.getReplyText()
);
});
return rabbitTemplate;
}
}
3. 发送消息代码,设置publisher confirm
@Test
void publisherConfirmSendMsg() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable throwable) {
log.error("消息回调失败", throwable);
}
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
log.info("收到消息回执");
if (confirm.isAck()) {
log.info("消息发送成功,收到ack");
} else {
log.error("消息发送失败,收到nack,原因:{}", confirm.getReason());
}
}
});
String exchangeName = "wl.topic";
// rabbitTemplate.convertAndSend(exchangeName, "aaa.bbb", "这是中国天气消息!", correlationData);
rabbitTemplate.convertAndSend(exchangeName, "china.weather", "这是中国天气消息!", correlationData);
}
若将
exchangeName
修改为不存在的名称wls.topic
若将
routingKey
修改为不存在的名称aaa.bbb
7.3 消费者确认机制
当开启消费者确认机制
Consumer Acknowledgement
后,消费者处理消息结束后,会向MQ发送一个回执,告知rabbitmq
消息处理的状态。回执有三种可选值:
- ack:成功处理消息,
rabbitmq
从queue
中删除该消息- nack:处理消息失败,
rabbitmq
需要再次投递消息- reject:处理消息失败并拒绝该消息,
rabbitmq
从queue
中删除该消息
针对
rabbitmq
的消费者确认机制,SpringAMQP
已经实现了消息确认功能,并允许我们通过配置文件的方式选择ack处理方式:
- none:不处理。消息投递给消费者后立即返回
ack
,消息从mq删除。不建议使用- manual:手动模式。需要自己在业务代码中调用
api
,发送ack
或reject
,存在业务入侵,但更灵活- auto:自动模式。利用
AOP
对接收消息的处理逻辑做环绕增强,业务正常执行自动返回ack
,推荐使用
当业务出现异常时,根据异常判断返回不同结果:
- 业务异常,例如
RuntimeException
,自动返回nack
- 消息处理或校验异常,例如
MessageCoversionException
,自动返回reject
1. 配置消费者确认机制
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
acknowledge-mode: auto #none:不处理;manual:手动模式;auto:自动模式
2. 配置消费者重试机制
此时,若抛出
RuntimeException
,mq的消息没有出队,会无限重新投递消息,给mq服务器带来不必要的压力,影响mq的性能,利用Spring
的retry
机制,在消费者出现异常时利用本地重试。配置如下:
rabbitmq:
host: 192.168.0.100
port: 5672
username: admin
password: 123456
virtual-host: /wlhost
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true
max-attempts: 3 #最大重试次数
stateless: true #true无状态,false有状态。如果业务中包含事务,这里改为false
注:重试次数耗尽之后,不管消息是否消费成功,消息都会出队,会导致消息丢失
3. 配置失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有
MessageRecoverer
接口来处理,三种实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败的消息投递到指定的交换机
配置失败消息处理策略信息
@Configuration
// 只有spring.rabbitmq.listener.simple.retry.enabled = true时,此配置才会生效
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorMqConfig {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(DirectExchange errorExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); // error是routingKey
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
生产者代码
@Test
void testSendObjectMsg() {
String exchangeName = "wl.object.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("张三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
rabbitTemplate.convertAndSend(exchangeName, "", student);
}
消费者代码
@RabbitListener(queues = "wl.object.queue")
public void objectReceivedMsg(Map<String, String> map) {
log.warn("收到了消息:{}", map);
// throw new RuntimeException("抛出业务异常");
throw new MessageConversionException("抛出业务异常");
}
4. 业务幂等性
方案一:唯一消息id
- 为每一条消息生成一个唯一的id,与消息一起发送给消费者
- 消费者收到消息处理完业务后,将消息id保存到数据库
- 如果下次收到相同的消息,去数据库查询判断是否存在,存在则不处理本次消息
开启默认的messageId,converter.setCreateMessageIds(true);
@SpringBootApplication
public class SpringbootConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootConsumerApplication.class, args);
}
@Bean
public MessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setCreateMessageIds(true);
return converter;
}
}
此id为
UUID.randomUUID().toString()
,可根据自己的需求更改成其他id,如分布式id、雪花算法id等
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
方案二:结合业务逻辑
- 比如更新订单状态(未支付=>已支付),可以先查询当前订单是否为已支付,若为已支付,不处理本次消息
八、数据持久化
消息有两种类型,临时消息
non-persistent
和持久化消息persistent
,通过springAMQP发送的消息默认都是持久化的。 发送消息时可以指定消息类型,deliver mode = 1
是临时消息,deliver mode = 2
是持久化消息
临时消息存在内存中,持久化消息存在磁盘上,当内存消息满时,会触发pageout
,此时mq是阻塞的,持久化消息不会触发pageout
mq的消息存在内存中,易丢失,需要采用数据持久化和改变队列模式(
lazy queue
)来解决消息丢失和阻塞问题
lazy queue
优化了消息写入磁盘的效率。保证队列和消息都持久化,再使用Lazy Queue
队列模式保证消息的可靠性,但内存只会保存最近的消息
声明Lazy Queue
的两种方式代码
@Bean
public Queue queue() {
return QueueBuilder.durable("fanout.queue1").lazy().build();
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "wl.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"},
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void direct1ReceivedMsg(String msg) {
log.info("消费者1可以收到routingKey为blue和red的消息:{}", msg);
}
消息持久化代码
@Test
void persistentSendMsg() {
Message message = MessageBuilder
.withBody("hello world".getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
String exchangeName = "wl.topic";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
九、死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信
dead letter
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue参数设置为false- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过
dead-letter-exchange
属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机被称为死信交换机(dead letter exchange
,简称DLX)死信交换机可以实现延迟消息的效果,但官网提供了延迟消息的插件,更方便实用
十、延迟消息
1. 安装插件 rabbitmq_delayed_message_exchange,安装对应mq的版本即可,将其放到plugins目录下执行以下命令,并重启mq服务。官网地址:https://www.rabbitmq.com/community-plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2. 生产者代码,设置messageProperties
的delay
即可,单位ms
@Test
void testSendDelayMsg() {
String exchangeName = "wl.delay.exchange";
Student student = Student.builder()
.studentId(111)
.studentName("张三")
.gender("男")
.age(24)
.tel("13265659898")
.clazzId(4)
.build();
// rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, message -> {
// message.getMessageProperties().setDelay(10000);
// return message;
// });
rabbitTemplate.convertAndSend(exchangeName, "delayMsg", student, getMessagePostProcessor(10000));
log.info("发送延迟消息:{}", student);
}
public MessagePostProcessor getMessagePostProcessor(Integer delayTime) {
return message -> {
message.getMessageProperties().setDelay(delayTime);
return message;
};
}
3. 消费者代码,将交换机的delayed
属性置true
即可
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue"),
exchange = @Exchange(name = "wl.delay.direct", type = ExchangeTypes.DIRECT, delayed = "true"),
key = {"delayMsg"}
))
public void delayReceivedMsg(Map<String, String> map) {
log.info("收到延迟消息:{}", map);
}