按照RabbitMQ正常使用流程,生产者会发送一条消息到RabbitMQ服务器,消费者接收到消息进行消费。但是在实际情况下,生产者很有可能在到达RabbitMQ服务器后,由于服务器的某种原因导致消息丢失(因为RabbitMQ默认是将消息存储在内存中),一旦丢失就是再也找不到了。
那么我们如何保证消息不丢失呢?
RabbitMQ有相应的持久化机制,可以将Exchange、Queue、Message全部持久化到磁盘。
那如果在将消息持久化到磁盘的过程中服务器挂了呢?
那么则需要通过数据保护机制来保证我们的消息一定能存储到磁盘,如果不成功,消息生产者则一直发送这条消息。
在RabbitMq中有两种数据保护机制:
1. 事物机制:
当消息达到服务器,开启事物,只有当消息存储完毕才提交事物,向生产者发送成功通知。如果失败,也会向生产者发送失败消息,生产者接收失败消息则继续发送此消息。在这个过程中生产者需要同步等待,所以,事物机制虽然可以保证消息可靠性,但是采用的是同步方式,会造成性能下降。
2. confirm机制:
一旦消息投递到队列,队列则会向生产者发送一个通知,如果设置了消息持久化到磁盘,则会等待消息持久化到磁盘之后再发送通知。生产者在发送完消息后不会等待回应,所以confirm机制性能相对比事物机制高。
如何开启RabbtiMQ的confirm模式:
需要在配置文件中配置如下:
rabbitmq:
publisher-confirms: true
如何开启队列持久化:
在声明队列时,设置durable属性为true
消息默认就是持久化到磁盘的。
具体生产者代码:
@Component
public class MessageSenderimplements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplaterabbitTemplate;
@Autowired
private RedisTemplateredisTemplate;
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if (ack) {//ack : 成功或失败的布尔值
//成功
//在消息发送时,将消息存入redis中,方便消息发送失败时,从redis中取值
//correlationData.getId()在发送消息时,需要生成的唯一标识
redisTemplate.delete(correlationData.getId());
}else {
//失败
//从redis中获取参数
Map map =redisTemplate.opsForHash().entries("message_" + correlationData.getId());
String exchange = map.get("exchange");
String message = map.get("message");
String routingKey = map.get("routingKey");
//重新发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
//自定义发送消息方法
public void sendMessage(String exchange, String routingKey, String message) {
//设置消息的唯一标识,CorrelationData会在confirm方法中作为参数传给我们
CorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());
//将消息存入redis
redisTemplate.opsForValue().set(correlationData.getId(), message);
//将本次发送消息的元信息存入redis,方便后面失败重新发送
Map metaDataMap =new HashMap<>();
metaDataMap.put("exchange", exchange);
metaDataMap.put("routingKey", routingKey);
metaDataMap.put("message", message);
redisTemplate.opsForHash().putAll("message_" + correlationData.getId(), metaDataMap);
//发送消息时,需要把correlationData对象带过去
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
}