RabbitMQ消息可靠投递及消费机制

Consumer Acknowledgements机制

当consumer消费端成功消费完消息后,返回给broker确认通知,告诉broker移除队列中已经消费成功的消息,如果消费端消费失败,可以通知broker将消费失败的消息重新放回队列中,以便继续消费。

  1. channel.basicAck(deliveryTag, multiple);

    consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量。

    例如:有值为5,6,7,8 deliveryTag的投递

    如果此时channel.basicAck(8, true);则表示前面未确认的5,6,7投递也一起确认处理完毕。

    如果此时channel.basicAck(8, false);则仅表示deliveryTag=8的消息已经成功处理。

  2. channel.basicNack(deliveryTag, multiple, requeue);

    consumer处理失败后,例如:有值为5,6,7,8 deliveryTag的投递。

    如果channel.basicNack(8, true, true);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息重新放回队列中。

    如果channel.basicNack(8, true, false);表示deliveryTag=8之前未确认的消息都处理失败且将这些消息直接丢弃。

    如果channel.basicNack(8, false, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

    如果channel.basicNack(8, false, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。

  3. channel.basicReject(deliveryTag, requeue);

    相比channel.basicNack,除了没有multiple批量确认机制之外,其他语义完全一样。

    如果channel.basicReject(8, true);表示deliveryTag=8的消息处理失败且将该消息重新放回队列。

    如果channel.basicReject(8, false);表示deliveryTag=8的消息处理失败且将该消息直接丢弃。

参数字段类型:

  • deliveryTag:long - 消息投递的唯一标识,作用域为当前channel
  • multiple:boolean - 是否启用批量确认机制
  • requeue:boolean - 消息处理失败是重新放回队列还是直接丢弃

Publisher Acknowledgements机制

使用标准AMQP 0-9-1协议,保证消息不丢失的唯一方法是使用事务 —— 通道事务,发布消息并提交。在这种情况下,事务是非常重量级的操作,会使得broker消息吞吐量降低250倍左右。那么,为了解决使用事务确保消息不丢失所带来的性能损耗。我们参考Consumer Acknowledgements确认机制的原理引入了Publisher Confirms确认机制。消息生产者发送消息给broker,当broker收到消息,将消息持久化到磁盘并同步至所有的镜像节点之后,才会返回给客户端消息投递成功确认。从而保证消息在投递过程中不会因为网络拥塞,服务宕机,机房断电等突发情况导致消息投递失败而丢失。当由于broker内部消息处理发生异常时,将返回给客户端basic.nack通知;当消息投递成功时,broker则返回给客户端basic.ack通知。

  1. 对于可路由的消息,当所有的队列接收到消息后,broker向client发送basic.ack确认通知;

  2. 对于路由到持久队列的持久化消息,当消息持久化到磁盘后,broker向client发送basic.ack确认通知;

  3. 对于路由到镜像队列的消息,当所有的镜像队列都接收到消息后,broker向client发送basic.ack确认通知;

  4. 对于不可路由的消息,broker一旦确认该消息不可路由时,则向client发送basic.nack确认通知;

  5. 对于不可路由且mandatory强制投递的消息,broker一旦确认该消息不可路由时,先向client发送basic.return通知,
    然后发送basic.nack确认通知;

    basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue.

延迟确认Ack:

对于持久化消息,是需要等待消息成功持久化到磁盘之后,broker才会返回给客户端basic.ack通知,为了提升IO吞吐量,broker并不会实时将消息刷回到磁盘,而是先将消息存储到内存中,在一定时间间隔后(几百毫秒)或当队列空闲时,批量将消息持久化到磁盘,然后在返回给客户端basic.ack确认通知。这就意味着在恒定负载下,basic.ack的延迟可以达到几百毫秒。那么,为了提升消息系统的吞吐量,强烈建议客户端应用程序采用异步方式处理消息basic.ack确认通知。

代码实现

@Bean
public RabbitTemplate myRabbitTemplate(){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    // Mandatory = true,则生产者发送消失失败时,会先调用ReturnCallback,再调用ConfirmCallback
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("消息发送失败");
        System.out.println("message : "+message.getMessageProperties().getCorrelationId());
        System.out.println("replyCode : "+replyCode);
        System.out.println("replyText : "+replyText);
        System.out.println("exchange : "+exchange);
        System.out.println("routingKey : "+routingKey);
    });

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        String id = correlationData.getId();
        if(ack){
            // 成功发送到队列后后调(与消费者是否成功消费无关)
            System.out.println("消息 :"+id+"  收到确认信息");
        }else {
            // basic.nack will only be delivered if an internal error
            // occurs in the Erlang process responsible for a queue.
            // 仅在Erlang发生内部错误,未能将消息发送至队列时调用
            System.out.println("消息 :"+id+"  发送失败");
            System.out.println("casue :"+cause);
        }
    });
    return rabbitTemplate;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容

  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,467评论 0 12
  • RabbitMQ核心基础概念 Server:又称之为Broker,接受客户端的连接,实现AMQP实体服务。 Con...
    Java大宝宝阅读 3,224评论 0 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...
    2205阅读 3,493评论 1 11
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,651评论 0 3