第五章: 消息的可靠性投递

rabbitmq 整个消息投递的路径为:producer--->rabbitmq broker--->exchange--->queue--->consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback
  • 消息从 exchange-->queue 投递失败 则会返回一个 returnCallback

confirm模式

1. 开启confirm配置

spring.rabbitmq.publisher-confirm-type=correlated

2. 编写回调函数

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class TestConfirmFallback implements RabbitTemplate.ConfirmCallback {

    /**
     * 消息投递到交换机的回调
     * @param data 配置相关
     * @param ack 是否发送成功
     * @param cause 发送失败的原因
     */
    @Override
    public void confirm(CorrelationData data, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息投递到交换机成功");
        } else {
            System.out.println("消息投递到交换机失败: " + cause);
        }
    }
}

3. 测试回调

    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback(testConfirmFallback);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "hello.world", "hello, world");
    }

return模式

1. 开启return模式

spring.rabbitmq.publisher-returns=true

2. 编写回调函数

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class TestReturnFallback implements RabbitTemplate.ReturnCallback {

    /**
     * 消息从交换机投递到队列的回调, 只有在投递失败时才会触发
     * @param message 消息对象
     * @param replyCode 响应码
     * @param replyText 响应消息
     * @param exchange 交换机名称
     * @param routingKey 路由
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);
    }
}

3. 设置消息处理模式

rabbitTemplate.setMandatory(true)

4. 测试

    @Test
    public void test() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(testReturnFallback);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "hello.world", "hello, world");
    }

事务

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容