rabbitmq发送消息确认分为两个阶段:
- 生产者->Broker
- 交换机 -> 队列
在第一阶段,rabbitmq提供了一个ConfirmCallback的回调函数,当消息被Broker接收到就会触发ConfirmCallback回调。
//配置消息投递交换机回调机制
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 消息投递交换机回调方法(生产者投递到交换机)
* @param correlationData 当前消息的唯一关联数据(消息唯一ID)
* @param ack 消息是否成功收到(是否投递成功)
* @param cause 消息投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息投递交换机反馈,correlationData:"+correlationData+" ack:"+ack+" cause:"+cause);
}
});
同时提供了一个配置参数publisher-confirm-type:
- publisher-confirm-type: none
表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发ConfirmCallback回调 - publisher-confirm-type: correlated
表示消息成功到达Broker后触发ConfirmCalllBack回调 - publisher-confirm-type: simple
simple模式下如果消息成功到达Broker后一样会触发ConfirmCalllBack回调,发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑。
在第二阶段中,broker中交换机发送消息到队列失败后,提供了一个ReturnCallBack回调函数,可以通过该函数对消息进行补偿等业务操作。
//配置消息交换机投递到队列回调机制
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 消息投递回调(交换机投递到队列),消息投递失败才触发该机制
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 消息投递的交换机
* @param routingKey 消息投递的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息投递队列失败回调,Message"+message+" code:"+replyCode+" text:"+replyText+" exchange:"+exchange+" routingKey:"+routingKey);
}
});
该回调函数需要在配置文件中通过参数publisher-returns: true开启。