rabbitmq 整个消息投递的路径为:producer--->rabbitmq broker--->exchange--->queue--->consumer
- 消息从 producer 到 exchange 则会返回一个 confirmCallback
- 消息从 exchange-->queue 投递失败 则会返回一个 returnCallback
confirm模式
1. 开启confirm配置
spring.rabbitmq.publisher-confirm-type=correlated
2. 编写回调函数
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class TestConfirmFallback implements RabbitTemplate.ConfirmCallback {
/**
* 消息投递到交换机的回调
* @param data 配置相关
* @param ack 是否发送成功
* @param cause 发送失败的原因
*/
@Override
public void confirm(CorrelationData data, boolean ack, String cause) {
if (ack) {
System.out.println("消息投递到交换机成功");
} else {
System.out.println("消息投递到交换机失败: " + cause);
}
}
}
3. 测试回调
@Test
public void test() {
rabbitTemplate.setConfirmCallback(testConfirmFallback);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "hello.world", "hello, world");
}
return模式
1. 开启return模式
spring.rabbitmq.publisher-returns=true
2. 编写回调函数
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class TestReturnFallback implements RabbitTemplate.ReturnCallback {
/**
* 消息从交换机投递到队列的回调, 只有在投递失败时才会触发
* @param message 消息对象
* @param replyCode 响应码
* @param replyText 响应消息
* @param exchange 交换机名称
* @param routingKey 路由
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
}
3. 设置消息处理模式
rabbitTemplate.setMandatory(true)
4. 测试
@Test
public void test() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(testReturnFallback);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "hello.world", "hello, world");
}