消息的不丢失,在MQ角度考虑,一般有三种途径:
- 生产者不丢数据
- MQ服务器不丢数据
- 消费者不丢数据
其余的方式是根据业务来说的,比如消息落库等等,这篇咱们就研究下MQ的机制。
1. 生产者不丢失数据
1.1 开启事务模式
amqp事务仅仅适用于publish和ack,rabbitmq新增了reject的事务,其他操作不具备事务的特性。也就是说:rabbit事务只可以保证:
- 生产者发出的消息成功被MQ服务器收到(不保证进入queue);
- 消费者发出的确认消息成功的被MQ服务器收到;
consumer
端的具体消费逻辑如果需要使用事务,只能引入外部事务。
RabbitTemplate代码:
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
(1)生产者发送消息,若是MQ服务器挂掉,那么程序会不断尝试重试,直至broker
恢复,会重新接收这个消息。
(2)若是消费者在ack之前挂掉,MQ服务器会将这条消息恢复,若是长时间没有收到consumer
端的确认消息,那么会将消息从unacked
状态转化为ready
状态。
(3)若是消费者处理消息期间抛出异常,MQ服务器会收到一个nack
或者reject
,MQ服务器也会恢复这条消息。
开启事务会大幅降低消息发送及接收效率,因为当已经有一个事务存在时,后面的消息是不能被发送或者接收(对同一个consumer而言)的。
1.2 confirm模式
为了producer
端知道消息是否进入queue
,可以使用confirm
和return
来代替事务。
confirm和return的配置:
//消息的确认机制(confirm);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
confirm和return的区别和联系:
- confirmCallBack:消息从生产者到达
exchange
时返回ack
,消息未到达exchange
返回nack
; - returnCallBack:消息进入
exchange
但未进入queue
时会被调用。
官网对confirm和return的描述:
When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal). When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received. When using confirms, the channel will be closed when the last confirm is received. When using only returns, the channel will remain open for the full 5 seconds. It is generally recommended to set the connection factory’s channelCacheSize to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed. You can monitor channel usage using the RabbitMQ management plugin; if you see channels being opened/closed rapidly you should consider increasing the cache size to reduce overhead on the server.
大概意思就是:在rabbit Template
发送操作完成时,channels
才会关闭。在连接工厂(ConnectionCache
)满的情况下,缓存中有空间时,channel
不会关闭,直到confirm/return
处理完成。
使用confirm
时,将在接受到ack
时关闭channels
;使用return
时,通道会整整持续5s的时间。通常建议将Connection Cache
设置足够大的值,以便发布的消息的channel
可以返回连接池而不是关闭。当看到channel
快速打开或者关闭时,应该考虑增加连接池大小以减少服务器的开销。
1.2.1 生产者消息丢失的情况
(1)rabbitmq由于短暂的网络异常,导致消息发送了出去,但是未到exchange,连接可以短时间恢复。
(2)rabbitmq服务器挂掉且长时间无法恢复,消息无法发送。
1.2.2 生产者消息发送失败(网络异常)
(埋坑)我们知道调用
ConfirmCallback
若是ack
返回false的消息未必没有到达exchange
(因为confirm
是异步的,在ack回来之前,Connection
异常中断,ConfirmCallback
立刻返回false)。但是ack返回false的消息一定未到达exchange。
于是我们可以统一处理ack=false
的情况,将消息再次发送一次。但是这不可避免的会导致重复消费。
当消息发送失败,一般两种方式处理这个消息:
- 自动重发;
- 系统预警人工处理;
配置文件源码:
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
我们可以看到,在ReturnCallback
中,返回的参数是Message
对象,我们可以获取消息内容
,exchange
,routingKey
这些信息的。
但是在ConfirmCallback
中,确是没有消息信息,只有一个correlationData[ˌkɒrəˈleɪʃn] 相关性的
,并且我们看到他的日志,打印出来还是null。
输出日志:
2019-03-10-00-52 [AMQP Connection 127.0.0.1:5672] [com.Configuration.MyAMQPConfig]
[INFO] - 消息发送成功:correlationData(null),ack(false),cause
(channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no
exchange 'exchange_direct_no' in vhost '/', class-id=60, method-id=40))
于是我们打开correlationData
的源码:
可以看到里面的属性只有一个id。
public class CorrelationData implements Correlation {
private volatile String id;
public CorrelationData() {
}
public CorrelationData(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
public String toString() {
return "CorrelationData [id=" + this.id + "]";
}
}
于是我们打开RabbitTemplate send/convertAndSend
方法的源码:
发现里面含有CorrelationData对象,很显然我们在发送消息的时候,消息信息和correnlationData.id
属性进行了绑定,我们若是可以根据id拿到消息,那么就可以进行“重试”或者“预警”等操作了。
于是我们扩展correlationData
类,将id和消息属性绑定起来。
public class CorrelationData extends
org.springframework.amqp.rabbit.support.CorrelationData {
//消息体
private volatile Object message;
//交换机
private String exchange;
//路由键
private String routingKey;
//重试次数
private int retryCount = 0;
我们在发送消息的时候,可以发送correlationData
扩展对象,在我们confirm
的ack=false
的情况下,于是我们就可以拿到消息主体了。
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
//使用继承扩展的CorrelationData 、id消息流水号
CorrelationData correlationData =
new CorrelationData(UUID.randomUUID().toString());
correlationData.setMessage(book);
correlationData.setExchange("exchange_direct_no");
correlationData.setRoutingKey("ord");
try {
rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book, correlationData);
} catch (AmqpConnectException e) {
System.out.println("保存信息编号:" + correlationData);
}
}
现在我们可以拿到消息主体,也可以拿到rabbitTemplate,那么我们是否可以在confirm回调方法中再次重试?
ConfirmCallback回调函数:
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageConfirmCallback.class);
private RabbitTemplate rabbitTemplate;
public MessageConfirmCallback(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack == true) {
logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
if (correlationData instanceof com.Configuration.CorrelationData) {
com.Configuration.CorrelationData messageCorrelationData = (com.Configuration.CorrelationData) correlationData;
String exchange = messageCorrelationData.getExchange();
Object message = messageCorrelationData.getMessage();
String routingKey = messageCorrelationData.getRoutingKey();
int retryCount = messageCorrelationData.getRetryCount();
//重试次数+1
((com.Configuration.CorrelationData) correlationData).setRetryCount(retryCount + 1);
rabbitTemplate.convertSendAndReceive(exchange, routingKey, message, correlationData);
}
}
}
}
注意事项:
但是在主线程发送消息的过程中,rabbitMQf服务器关闭,这时候主程序和ConfirmCallback
线程都会等待Connection恢复,然后重新启动rabbitmq
,当应用程序重新建立connection
之后,两个线程都会死锁。
解决方案:
当ack=fasle
的情况下,可以将消息存到缓存中,定时发起任务重发。
1.2.2 生产者消息发送失败(MQ服务器异常)
对于这种情况,confirm是没有发送出去的,但是消息丢失怎么处理,但是会抛出AmqpConnectException
异常,我们可以捕获该异常,然后将msgId
也就是CorrelationData
对象保存即可。
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
CorrelationData msgId=new CorrelationData();
try {
rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book,msgId);
}catch (AmqpConnectException e){
System.out.println("保存信息编号:"+msgId);
}
推荐阅读:
https://www.jianshu.com/p/9aec19a910b1
https://blog.csdn.net/qq315737546/article/details/66475103
https://www.colabug.com/2325507.html
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases