RabbitMQ 中的 broker 是指什么?cluster 又是指什么?
broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。
cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。
什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?
在非 cluster 模式下,元数据主要分为 Queue 元数据(queue 名字和属性等)、Exchange 元数据(exchange 名字、类型和属性等)、Binding 元数据(存放路由关系的查找表)、Vhost 元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。
在 cluster 模式下,还包括 cluster 中 node 位置信息和 node 关系信息。元数据按照 erlang node 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。元数据在 cluster 中是全 node 分布的。
RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,还是对应着进程实体?分别起什么作用?
Queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。
消息基于什么传输?
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
vhost 是什么?起什么作用?
vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
消息如何分发?
- 循环轮询 (round-robin) ;
- 公平分发 (Fair).
消息怎么路由?
从概念上来说,消息路由必须有三部分:交换器、路由、绑定。生产者把消息发布到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。
常用的交换器主要分为三种:
direct:如果路由键完全匹配,消息就被投递到相应的队列(Simple,WorkQueues,RoutingKey)
fanout:如果交换器收到消息,将会广播到所有绑定的队列上(订阅/发布模式)
topic:可以使来自不同源头的消息能够到达同一个队列。使用topic交换器时,可以使用通配符。(Topics)
比如:“*” 匹配特定位置的任意文本, “.” 把路由键分为了几部分,“#” 匹配所有规则等。
特别注意:发往topic交换器的消息不能随意的设置选择键(routing_key),必须是由"."隔开的一系列的标识符组成。
死信队列和延迟队列的使用?
死信消息:
- 消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false;
- 消息过期;
- 队列达到最大的长度,先入队的消息会变成DL。
过期消息:
在 rabbitmq 中存在2种方可设置消息的过期时间:
第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间;
第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。
- 队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
- 单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
有了以上的基础知识,我们完成以下需求:
需求:用户在系统中创建一个订单,如果超过时间用户没有进行支付,那么自动取消订单。
分析:
1、上面这个情况,我们就适合使用延时队列来实现,那么延时队列如何创建
2、延时队列可以由 过期消息+死信队列 来时间
3、过期消息通过队列中设置 x-message-ttl 参数实现
4、死信队列通过在队列申明时,给队列设置 x-dead-letter-exchange 参数,然后另外申明一个队列绑定x-dead-letter-exchange对应的交换器。
如何保证消息的可靠性投递
发送方确认模式
将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。
一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。
如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
消息队列持久化
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,其实也很容易,就下面两步
- 将queue的持久化标识durable设置为true,则代表是一个持久的队列
- 发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
接收方确认机制
接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;
下面罗列几种特殊情况:
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
- 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。
消息重复投递和重复消费
在消息生产时,生产者用一个唯一标识 (时间戳,uuid等)给messageid赋值,作为去重和幂等的依据;
在消息消费时,根据消息id去判断该消息是否已被消费过,如果已经消费过,则不处理该消息,否则正常消费,并且进行入库操作(消息全局ID作为数据库表的主键,防止重复)
这个问题针对业务场景来答分以下几点:
比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
消息顺序性
解决办法就是把需要保证顺序的一系列消息,按顺序依次发送到同一个队列中。
如何解决消息积压
出现消息积压原因和解决办法:
消费者消费消息的速度赶不上生产速度,这总问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑已保证消费度跟上生产消息的速度,譬如增加消费者的数量等。
消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是又问题,需要优化程序。
其他原因,例如消费者做了限速,不能增加更多消费者; 那么当消息生产过快,一定会积压。
可以拆分MQ,生产者一个MQ,消费者一个MQ。 生产者MQ和消费者MQ用一个程序监控,当生产者生产过快时,可以发送到延时队列,延时再发送到消费者MQ。
消息接收模式
推模式(Consume)和拉模式(Get)
集群模式
普通模式:默认的集群模式。
- 对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构。
- 当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
- 所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。
- 该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。
- 如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,然后就没有然后了。
镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。
- 该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。
- 该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。
- 所以在对可靠性要求较高的场合中适用于该模式。
先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。
多个RabbitMQ运行在不同的物理服务器上,实现高可用。
该设计架构可以如下:在一个集群里,有3台机器,其中1台使用磁盘模式,另2台使用内存模式。2台内存模式的节点,无疑速度更快,因此客户端(consumer、producer)连接访问它们。而磁盘模式的节点,由于磁盘IO相对较慢,因此仅作数据备份使用,另外一台作为反向代理。