RabbitMQ(十)消费者应答和发送者确认

文档:

https://www.rabbitmq.com/confirms.html

介绍:
使用像RabbitMQ这样的消息代理的系统是分布式的,所以消息是否能到达对端或是被成功处理是无法保证的。

所以,无论生产者还是消费者,都需要一种消息传递和处理的确认机制。

消费者传递应答

当RabbitMQ发送一个消息给消费者,他需要知道消息是什么时候被成功投递了。

1)投递的标识:Delivery Tags

当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel.

delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。

2)应答模式

根据所使用的确认模式,RabbitMQ可以考虑在发送(写入TCP套接字)之后立即成功传送消息,或者接收到显式(“手动”)客户机确认时成功传送。 手动发送的确认可以是肯定的或否定的,并且使用以下协议方法之一:

  • basic.ack is used for positive acknowledgements
  • basic.nack is used for negative acknowledgements
  • basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack

积极的应答告诉RabbitMQ记录一个消息被投递了,像 basic.reject 这样的消极应答有着同样的作用。

积极应答假定消息被成功处理,消极应答表示投递没被处理,还是要被删掉。

3)一次应答多个投递

为了减少网络流量,手动应答可以被批处理。

ack unack 的mutiple参数设置为true,则可以一次性应答delivery_tag小于等于传入值的所有应答。

4)通道预取设置(QoS)

因为消息发送到客户端是异步的,在任何给定时刻在信道上通常存在多个消息竞争。此外,来自客户端的手动应答本质上也是异步的。所以总是存在一个消息未确认的滑动窗口。开发人员通常希望限制此窗口的大小,以避免消费者端的无界缓冲区问题。

可以通过使用 basicQos 这个方法来设置预取的个数。这个数值定义了一个通道最多有多少个未确认的消息。

值得重申的是,投递流程和手动客户端确认是完全异步的。 因此,如果在投递中已经有消息的情况下改变预取值,则会出现自然竞争条件,并且在信道上可能暂时存在多于预取未确认消息数量。

5)客户端错误:双重应答和未知 tag

如果客户端对同一个 delivery tag 应答超过一次,rabbitMQ会返回一个通道错误:

PRECONDITION_FAILED - unknown delivery tag 100

如果一个未知的 delivery tag 被使用的话,会返回同样的错误。

发送者确认

使用标准AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务 - 使通道事务,发布消息,提交。 在这种情况下,事务不必要地重量级,并且吞吐量减少了250倍。为了补救这一点,引入了确认机制。 它模仿协议中已经存在的消费者确认机制。

为了启用确认机制,客户端发送 confirm.select 方法。 根据是否设置了 no-wait,代理可以用confirm.select-ok进行响应。 一旦在一个通道上使用 confirm.select 方法,它就被认为处于确认模式。 事务通道不能进入确认模式,一旦通道处于确认模式,它不能进行事务。

一旦通道处于确认模式,代理和客户端计数消息(计数从第一个确认选择的1开始计数)。 然后,代理程序在通过在同一个通道上发送basic.ack来处理消息时确认消息。 delivery-tag字段包含确认消息的序列号。 代理还可以在basic.ack中设置多个字段,以指示已经处理了直到并且包括具有序列号的消息的所有消息。

1)消极确认

在特殊情况下,代理无法成功处理消息,代理将发送basic.nack而不是basic.ack。 在这个上下文中,basic.nack的字段与basic.ack中的相应字段具有相同的含义,并且应忽略requeue字段。 通过nack一个或多个消息,代理指示它不能处理消息并拒绝对它们负责; 在这一点上,客户端可以选择重新发布消息。

在将通道置于确认模式后,所有后续发布的消息将被确认或nack一次。 不能保证消息被多久确认一次。 没有消息将被确认和nack。

只有在负责队列的Erlang进程中发生内部错误时,才会传递basic.nack。

2)消息多久被确认

对于一个不可路由的消息,一旦交换器证实消息不可能路由到任何队列,代理会发布一个确认。

如果消息发布设置为mandatory,basic.return在basic.ack之前发送到客户端。 对于否定确认(basic.nack)也是如此。

对于可路由消息,当所有队列接受消息时,发送basic.ack。 对于路由到持久队列的持久消息,这意味着持久存储到磁盘。 对于镜像队列,这意味着所有镜像都接受了消息。

3)持久化消息的ACK延迟

路由到持久队列的持久消息的basic.ack是在将消息保存到磁盘后才发送的。 RabbitMQ消息存储器在间隔(几百毫秒)后将消息批量保存到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。 这意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。 为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布批量消息,并等待未完成的确认。 具体的API在不同的客户端库之间有所不同。

4)确认和保证投递

如果代理在消息写入磁盘之前崩溃,将丢失持久消息。 在某些条件下,这导致代理行为诡异。

例如,考虑这种情况:

  • 客户端向持久队列发布持久消息
  • 客户端消费了队列中的消息(注意消息是持久的,队列持久),但是还没有确认
  • 代理挂了并重启
  • 客户端重连并开始消费

在这一点上,客户端可以合理地假设消息将被再次传送。

在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并非事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使用 confirm 机制。如果 publisher 使用的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进行 ack ,同时该消息也未被写入磁盘)。

限制

1)最大 Delivery Tag

传递标记是一个64位长的值,因此其最大值为9223372036854775807。由于Delivery Tag唯一标识每个通道的每次投递,所以,发送者或客户端在实践中不太可能超过此值。

代码示例

package com.xc.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
 * Created by xc.
 */
public class PublisherConfirms {

    private static final String QUEUE_NAME = "publisher-confirms";

    private static final int MSG_COUNT = 10;

    private static ConnectionFactory factory;

    static {
        factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
    }


    public static void main(String[] args) throws Exception {
        // Publish MSG_COUNT messages and wait for confirms.
        (new Thread(new Consumer())).start();
        // Consume MSG_COUNT messages.
        (new Thread(new Publisher())).start();

    }

    static class Publisher implements Runnable {

        volatile SortedSet<Long> ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

         public void run() {
             try {
                 long startTime = System.currentTimeMillis();

                 // 创建一个新的连接
                 Connection connection = factory.newConnection();
                 // 创建一个频道
                 Channel channel = connection.createChannel();

                 channel.queueDeclare(QUEUE_NAME, true, false, false, null);

                 channel.confirmSelect();

                 channel.addConfirmListener(new ConfirmListener() {
                     public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                         if (multiple) {
                             for (long i = ackSet.first(); i <= deliveryTag; ++i) {
                                 System.out.println("handle ack multiple, tag : " + deliveryTag);
                                 ackSet.remove(i);
                             }
                         } else {
                             System.out.println("handle ack, tag : " + deliveryTag);
                             ackSet.remove(deliveryTag);
                         }
                     }

                     public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                         System.out.println("handle nack, tag : " + deliveryTag);
                     }
                 });

                 // Publish
                 for (long i = 0; i < MSG_COUNT; ++i) {
                     ackSet.add(i);
                     channel.basicPublish("", QUEUE_NAME,
                             MessageProperties.PERSISTENT_TEXT_PLAIN,
                             "nop".getBytes());
                     System.out.println("send msg : " + "nop");
                 }

                 // Wait
                 while (ackSet.size() > 0)
                     Thread.sleep(10);

                 // Cleanup
                 channel.close();
                 connection.close();

                 long endTime = System.currentTimeMillis();
                 System.out.printf("Test took %.3fs\n", (float) (endTime - startTime) / 1000);

             } catch (Throwable e) {
                 System.out.println("foobar :(");
                 e.printStackTrace();
             }
         }

    }

    static class Consumer implements Runnable {

        public void run() {
            try {
                // Setup
                Connection conn = factory.newConnection();
                Channel ch = conn.createChannel();
                ch.queueDeclare(QUEUE_NAME, true, false, false, null);

                // Consume
                QueueingConsumer qc = new QueueingConsumer(ch);
                ch.basicConsume(QUEUE_NAME, true, qc);
                for (int i = 0; i < MSG_COUNT; ++i) {
                    QueueingConsumer.Delivery delivery = qc.nextDelivery();
                    System.out.println("got msg : " + new String(delivery.getBody()));
                }

                // Consume
                ch.close();
                conn.close();
            } catch (Throwable e) {
                System.out.println("Whoosh!");
                e.printStackTrace();
            }
        }
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容

  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,651评论 0 3
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,357评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,650评论 18 139
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,467评论 0 12
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,343评论 0 1