RabbitMQ之消息确认与Return机制

上一篇:https://www.jianshu.com/p/44b2885a5253


在上一篇中,笔者介绍了怎么让RabbitMQ如何保证数据不丢失, 但除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达RabbitMQ服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达RabbitMQ服务器的。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ 针对这个问题,提供了两种解决方式:

  • 通过事务机制实现

  • 通过发送方确认机制实现

一、事务机制


RabbitMQ 客户端中与事务机制相关的方法有3个: channel.txSelect,channel.txCommit,channel.txRollback。channel.txSelect 用于开启事务, channel.txCommit 用于提交事务; channel.txRollback 用于回滚事务。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。

public static void main(String[] args) throws IOException {
     //...
        //开启事务
        channel.txSelect();
        try{
            channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }
        //...
    }

输出结果:发送成功
流程图

根据上图可以看出开启事务机制与未开启事务机制多了四个步骤:

  • 1、客户端发送 Tx.Select ,将信道置为事务模式。
  • 2、 Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
  • 3、在发送完消息之后,客户端发送 Tx.Commit 提交事务。
  • 4、 Broker回复 Tx.Commit.Ok ,确认事务提交。
    下面来看一下事务回滚,上代码
 //开启事务
        channel.txSelect();
        try{
            channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
            int i = 1/0;
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }

输出结果:发送失败,进行日志记录
image.png

在事务提交之前捕获到异常,之后显式地提交事务回滚。

如果要发送多条消息,则将 channel.basicPublish,channel.txCommit 等方法包裹进循环内即可。

 //开启事务
        channel.txSelect();
        for (int a = 0; a < 10; a++) {
            try{
                channel.basicPublish(exchange,"key-1",null,"发送路由key为 = key-1 的消息".getBytes());
                int i = 1/0;
                //提交事务
                channel.txCommit();
                System.out.println("发送成功");
            }catch (Exception e){
                System.out.println("发送失败,进行日志记录");
                //回滚事务
                channel.txRollback();
            }
        }

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会降低RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢? 下面就来介绍RabbitMQ提供另外一种方式:发送方确认机制。


发送方确认机制

注:发送方确认机制是确认生产者是否成功发送消息到交换机

一、原理

生产者通过调用channel.confirmSelect 方法将channel设置成confirm模式,一旦channel进入confirm模式,所有在该channel上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中的deliver-tag包含了确认消息的序号,此外broker也可以设置basic.ack的multiple参数,表示到这个序号之前的所有消息都已经得到了处理。

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel被设置成 confirm模式之后,所有被发送的后续消息都将被 ack或者被nack一次。,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 没有对消息被 confirm 的快慢做任何保证。

原生api方式

1、普通confirm模式

每发送一条消息后就调用 channe.waitForConfirms方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。和事务机制一样。

 //将信道置为 publisher confirm 模式
        channel.confirmSelect();
        channel.basicPublish(exchange,"key-2",null,"发送路由key为 = key-1 的消息".getBytes());
        boolean b = channel.waitForConfirms();
        System.out.println("发送成功" + b);

如果发送多条消息,只需要将 channel.basicPublish、channel.waitForConfirms方法包裹在循环里面即可。

//将信道置为 publisher confirm 模式
    channel.confirmSelect();
        for (int i = 1; i < 10; i++) {
            channel.basicPublish("exchange-1","key-3",null,"发送路由key为 = key-1 的消息".getBytes());
            boolean b = channel.waitForConfirms();
            System.out.println("发送成功" + b);
        }

2、批量confirm

每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回(也是同步的,只是一次发送多条信息,然后统一确定)。

 //将信道置为 publisher confirm 模式
        channel.confirmSelect();
        for (int i = 1; i < 10; i++) {
            channel.basicPublish("exchange-1", "key-1", null, "发送路由key为 = key-1 的消息".getBytes());
        }
//批量确认信息,发送的消息中,如果有一条是失败的,则所有消息发送都会失败
        boolean b = channel.waitForConfirms();
        System.out.println("发送成功" + b);

3、异步confirm

异步 confirm 方法的编程实现最为复杂。在客户端 Channel 接口中提供的
addConfirmListener方法可以添加 ConfirmListener这个回调接口,这个
ConfirmListener 接口包含两个方法: handleAck、handleNack,分别用来处理RabbitMQ 回传的 Basic.Ack、Basic.Nack 。在这两个方法中都包含有两个参数 deliveryTag(标记消息的唯一有序序号)multiple(是否批量confirm true代表是)

        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("exchange-1", "key-1", null, "发送路由key为 = key-1 的消息".getBytes());
        }
        channel.addConfirmListener(new ConfirmListener() {
            //参数一:deliveryTag: 消息的编号
            //参数二:multiple:是否批量confirm true 是
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
                //注意这里需要添加处理消息重发的场景
            }
        });
        System.out.println("其他逻辑");
//异步就不需要关闭连接了

SpringBoot(2.2.4.RELEASE)

在yml中配置是否需要消息确认

 publisher-confirm-type: CORRELATED

publisher-confirm-type有三个选项:

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

编码

实现ConfirmCallback

@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {

   Logger logger = LoggerFactory.getLogger(InfoConfirm.class);

   @Autowired
   private RabbitTemplate rabbitTemplate;

    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     */
   @PostConstruct
   public void init(){
       rabbitTemplate.setConfirmCallback(this);
   }

    /**
     * 此方法用于监听消息是否发送到交换机
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info("消息成功发送到交换机");
            logger.info("id = {} ",correlationData.getId());
            byte[] body = correlationData.getReturnedMessage().getBody();
            logger.info("message = {}",new String(body));
        }else {
            logger.info("消息发送到交换机失败");
            logger.info("cause = {}",cause);
            logger.info("id = {} ",correlationData.getId());
            byte[] body = correlationData.getReturnedMessage().getBody();
            logger.info("message = {}",new String(body));

        }
    }

}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

发送方法

   @GetMapping
    public void send(){
        CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
        Message message = new Message("returnedMessage:哈哈哈".getBytes(),null);
        correlation.setReturnedMessage(message);
        rabbitTemplate.convertAndSend("exchange-1","key-1","发送消息",correlation);
    }

Return机制

上面已经讲了发送方确认机制,我们已经知道发送方确认机制是确认生产者是否成功发送消息到交换机。交换机是否发送到具体的队列那我们就不知道了。如果想知道交换机是否将消息发送到队列,就需要用到return机制:监控交换机是否将消息发送到队列

在客户端 Channel 接口中提供的addReturnListener方法,可以添加 ReturnListener这个回调接口,这个ReturnListener接口包含一个方法:handleReturn,用来处理交换机发送消息到队列失败,则执行此方法。

原生api

         channel.confirmSelect();
            /**
             * mandatory:当mandatory 参数设为 true 时,
             * 交换器无法根据自身的类型和路由键找到一个符合条件的队列时,
             * 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。
             * 当 mandatory参数设置为 false 时,出现上述情形,则消息直接被丢弃。
             */
            channel.basicPublish("exchange-1", "key-4",true, null, "发送路由key为 = key-1 的消息".getBytes());
          //消息确认机制
            channel.addConfirmListener(new ConfirmListener() {
                //参数一:deliveryTag: 消息的编号
                //参数二:multiple:是否批量confirm true 是
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
                }

                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
                    //注意这里需要添加处理消息重发的场景
                }
            });

            //Return机制
            channel.addReturnListener(new ReturnListener() {
                /*
                 * 参数1:响应code
                 * 参数2:响应文本
                 * 参数3:交换机名称
                 * 参数4:路由key
                 * 参数5:消息的基本属性集
                 * 参数6:消息内容
                 */
                public void handleReturn(int replyCode, String replyText,
                                         String exchange, String routingKey,
                                         AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交换机发送消息到队列失败,则执行此方法
                    System.out.println("replyCode =" + replyCode);
                    System.out.println("replyText =" + replyText);
                    System.out.println("exchange =" + exchange);
                    System.out.println("routingKey =" + routingKey);
                    System.out.println("properties =" + properties);
                    System.out.println("body =" + new String(body));
                }
            });
            System.out.println("其他逻辑");
image.png

注:调用channel.basicPublish时,需要将mandatory参数设置为true

SpringBoot

在yml中配置开启Return机制

publisher-returns: true

编码

实现 ReturnCallback 接口

@Component
public class Return implements RabbitTemplate.ReturnCallback {

    Logger logger = LoggerFactory.getLogger(InfoConfirm.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnCallback(this);
    }
    //处理交换机发送消息到队列失败,则执行此方法。
    @Override
    public void returnedMessage(Message message,
                                int replyCode, String replyText,
                                String exchange, String routingKey) {
        logger.info("交换机到队列失败=====》");
        logger.info("message = {}",new String(message.getBody()));
        logger.info("replyCode = {}",replyCode);
        logger.info("replyText = {}",replyText);
        logger.info("exchange = {}",exchange);
        logger.info("routingKey = {}",routingKey);
    }
}

发送消息

    @GetMapping
    public void send(){
        CorrelationData correlation = new CorrelationData(UUID.randomUUID().toString());
        Message message = new Message("returnedMessage:哈哈哈".getBytes(),null);
        correlation.setReturnedMessage(message);
        rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
    }

草图一张

https://blog.csdn.net/u013256816/article/details/55515234

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容