6.RabbitMQ 可靠性投递与高可用架构

1.可靠性投递

1)简述分析

1.可靠性投递:也就是在使用 RabbitMQ 实现异步通信的时候,消息丢了怎么办,消息重复消费怎么办?

2.明确一个问题:因为效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。比如发送通知或者记录日志的这种场景,如果用户没有收到通知,不会造成业务影响,只要再次发送就行了

3.可靠性保证的环节

RabbitMQ 的工作模型

1.代表消息从生产者发送到 Broker:生产者把消息发到 Broker 之后,怎么知道自己的消息有没有被 Broker 成功接收?

2.代表消息从 Exchange 路由到 Queue:Exchange 是一个绑定列表,如果消息没有办法路由到正确的队列,会发生什么事情?应该怎么处理?

3.代表消息在 Queue 中存储:队列是一个独立运行的服务,有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,那么消息要一直存储在队列里面。如果队列出了问题,消息肯定会丢失。怎么保证消息在队列稳定地存储呢?

4.代表消费者订阅 Queue 并消费消息:队列的特性是什么?FIFO。队列里面的消息是一条一条的投递的,也就是说,只有上一条消息被消费者接收以后,才能把这一条消息从数据库删掉,继续投递下一条消息。那么问题来了,Broker 怎么知道消费者已经接收了消息呢?

2)消息发送到 RabbitMQ 服务器

1.提供机制服务端确认机制,也就是在生产者发送消息给RabbitMQ 的服务端的时候,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。

2.服务端确认机制-事务模式

1)事务模式怎么使用:我们通过一个 channel.txSelect()的方法把信道设置成事务模式,然后就可以发布消息给 RabbitMQ 了,如果 channel.txCommit();的方法调用成功,就说明事务提交成功,则消息一定到达了 RabbitMQ 中。如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback()方法来实现事务回滚。

事务确认机制图

2)缺点:在事务模式里面,只有收到了服务端的 Commit-OK 的指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干 RabbitMQ 服务器的性能。所以不建议大家在生产环境使用

3)Spring Boot 中的设置

SpringBoot

3)服务端确认机制-Confirm(确认)模式

1.普通确认模式:在生产者这边通过调用 channel.confirmSelect()方法将信道设置为 Confirm 模式,然后发送消息。一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者,也就是调用 channel.waitForConfirms()返回 true,这样生产者就知道消息被服务端接收了!----》方式消息效率还不是太高

2.批量确认模式:批量确认,就是在开启 Confirm 模式后,先发送一批消息。只要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息都被服务端接收了。

批量确认的方式比单条确认的方式效率要高,但是也有两个问题,第一个就是批量的数量的确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升不上去。数量多的话,又会带来另一个问题,比如我们发 1000 条消息才确认一次,如果前面 999 条消息都被服务端接收了,如果第 1000 条消息被拒绝了,那么前面所有的消息都要重发。

3.异步确认模式:一边发送一边确认!异步确认模式需要添加一个 ConfirmListener,并且用一个 SortedSet 来维护没有被确认的消息。Confirm 模式是在 Channel 上开启的,因为 RabbitTemplate 对 Channel 进行了封装,叫做 ConfimrCallback

异步确认

3)消息从交换机路由到队列

1.在什么情况下,消息会无法路由到正确的队列?可能因为路由键错误,或者队列不存在。

2.两种方式处理无法路由的消息:一种就是让服务端重发给生产者,一种是让交换机路由到另一个备份的交换机。

3.消息回发的方式:使用 mandatory 参数和 ReturnListener(在 Spring AMQP 中是ReturnCallback)。

处理路由-消息回发

4.备份交换机的方式:在创建交换机的时候,从属性中指定备份交换机。

处理路由-备份交换机

5.注意区别:队列可以指定死信交换机;交换机可以指定备份交换机!

4)消息在队列存储

1.问题:如果 RabbitMQ 的服务或者硬件发生故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘

2.解决方案:1)队列持久化 2)交换机持久化 3)消息持久化 4)集群

队列持久化
交换机持久化
消息持久化

集群:如果只有一个 RabbitMQ 的节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件发生故障,RabbitMQ 的服务一样是不可用的,所以为了提高 MQ 服务的可用性,保障消息的传输,我们需要有多个 RabbitMQ 的节点!

5)消息投递到消费者

1.问题:如果消费者收到消息后没来得及处理即发生异常,或者处理过程中发生异常,会导致4失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者。

2.解决方案:RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送 ACK 给服务端。没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。

3.指定参数:消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。

4.如何设置手动 ACK?

设置手动

5.三个值的区别: NONE:自动 ACK   MANUAL: 手动 ACK   AUTO:如果方法未抛出异常,则发送 ack。

6.auto注意:如果方法未抛出异常,则发送 ack!当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且不重新入队。当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会发送 ACK。其他的异常,则消息会被拒绝,且 requeue = true 会重新入队。

7. Spring Boot 中,消费者又怎么调用 ACK,或者说怎么获得 Channel 参数呢?

SpringBoot中怎么使用

8.拒绝的方式:Basic.Reject()拒绝单条,Basic.Nack()批量拒绝。如果 requeue 参数设置为 true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。

6)消费者回调

1.调用生产者 API:例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息后调用提单系统提供的 API,来修改提单系统中数据的状态。只要 API 没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

2.发送响应消息给生产者:例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。

7)补偿机制

1.问题:如果生产者的 API 就是没有被调用,也没有收到消费者的响应消息,怎么办?

2.解决方案:设置超时时间

8)消息幂等性

1.消息出现重复可能会有两个原因:1).生产者的问题,环节1重复发送消息,比如在开启了 Confirm 模式但未收到确认,消费者重复投递。2).环节4出了问题,由于消费者未发送 ACK 或者其他原因,消息重复投递。3)生产者代码或者网络问题。

2.解决方案:对于重复发送的消息,可以对每一条消息生成一个唯一的业务 ID,通过日志或者消息落库来做重复控制。

9)最终一致

1.如果确实是消费者宕机了,或者代码出现了 BUG 导致无法正常消费,在我们尝试多次重发以后,消息最终也没有得到处理,怎么办?

例如存款的场景,客户的钱已经被吞了,但是余额没有增加,这个时候银行出现了长款,应该怎么处理?如果客户没有主动通知银行,这个问题是怎么发现的?银行最终怎么把这个账务做平?

在我们的金融系统中,都会有双方对账或者多方对账的操作,通常是在一天的业务结束之后,第二天营业之前。我们会约定一个标准,比如 ATM 跟核心系统对账,肯定是以核心系统为准。ATMC 获取到核心的对账文件,然后解析,登记成数据,然后跟自己记录的流水比较,找出核心有 ATM 没有,或者 ATM 有核心没有,或者两边都有但是金额不一致的数据。对账之后,我们再手工平账。比如取款记了账但是没吐钞的,做一笔冲正。存款吞了钞没记账的,要么把钱退给客户,要么补一笔账。

10)消息的顺序性

1.定义:消息的顺序性指的是消费者消费消息的顺序跟生产者生产消息的顺序是一致的。

2.举例:商户信息同步到其他系统,有三个业务操作:1、新增门店 2、绑定产品 3、激活门店,这种情况下消息消费顺序不能颠倒(门店不存在时无法绑定产品和激活)。

3.现状:在 RabbitMQ 中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用的队列)。

生产者消费者一一对应
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352