浅谈消息队列(消息中间件)

一、什么是消息队列

消息(Message),指程序之间传递的数据。

消息队列(Message Queue),简称MQ,是保存消息的一个容器,本质是一个容器。分布式系统中常称为消息中间件,可将消息保存在内存和磁盘上,其他程序可以使用其取出消息。

二、为什么使用消息队列

随着业务的不断扩张,分布式系统中成百上千的服务之间相互调用,使得调用链非常复杂。
用户的不断增加,使得系统需要承载大量的请求,并在合理时间内给予响应。

消息队列在上诉背景下,起到的作用:

  • 异步处理
    随着业务的发展,调用的服务会越来越多,导致响应时间越来越长。例如电子支付,刚开始只是支付下单,慢慢就加上短信服务、钱包服务等,这样请求调用下来,接口耗时会增加不少。
    业务流程中,比较耗时的操作,或者不需要及时响应的操作,都可以做异步处理。
    这里的短信、钱包业务,不需要及时响应,就可以做异步处理。在支付下单完成后,将消息放入消息队列中,就可以直接返回响应了,这样大大减少了响应时间,提升系统的吞吐量,提高用户体验。

  • 服务解耦
    业务的扩充会有下游服务的增加,为了下游服务的使用,就需要修改原有的服务。例如支付服务里支付下单后,可能要增加统计服务、数据分析服务等,这都需要修改原有的订单服务。
    对于扩充的下游服务,可以进行解耦,上游服务将消息放入消息队列,下游服务自己去消息队列取。
    这里的支付服务里支付下单完成后,将消息放入消息队列中,新增的统计服务、数据分析服务,自己去消息队列取,就不用修改原有的订单服务了,降低服务之间的耦合性,提升了系统的扩展性。

  • 流量削峰
    后端服务的承载能力有一定的限度,在请求量激增的时候,会超过服务的承载上限。例如秒杀活动,一瞬间大量的下单请求,超过了后端服务的处理能力,可能会使服务宕机。
    对于高并发的情况下,可将消息放入消息队列中,后端服务按自己的速度去消息队列取消息处理。
    这里可将高并发的下单请求,放入消息队列中,订单服务按自己最大的能力去消息队列取消息处理,降低了高并发的请求量,提升了服务的稳定性。

三、基本概念

  • 生产者(Producer)
    生产消息,并将消息发送至Broker

  • 消息队列服务端(Broker)
    存储消息

  • 消费者(Consumer)
    从Broker获取消息,并处理消息

  • 发布/订阅
    生产者发布消息至Broker中的指定队列,这个指定队列中的每一条消息,都可以被多个消费者处理,也就是1对多的关系


    1690275541538.png

四、如何保证消息不丢失

一条消息经过的完整流程,分为三个阶段:生产消息、存储消息、消费消息。
消息的可靠性增强,会导致性能下降,实际要看具体业务,是追求可靠性,还是追求性能,例如传输日志,丢一两条消息关系不大,可以适当的降低可靠性。

4.1 生产消息

生产者发送消息至Broker,需要处理Broker返回的响应。如果Broker返回写入失败,或者网络超时等原因,则需要重试发送,多次发送失败需要记录日志。这个做法的前提条件是,消费者做幂等。

4.2 存储消息

Broker接收到消息,存储完毕后,再给生产者响应。
如果Broker是集群部署,那至少要两台Broker都存储完毕后,再给生产者响应。

4.3 消费消息

消费者拿到消息,消费完毕后,再给Broker响应。

五、幂等处理重复消息

5.1 重复消息的产生

一方面是生产者产生了多条重复消息,另一方面是消费者对同一条消息进行重复消费。

  • 为了保证消息不丢失,在生产消息阶段,如果Broker存储了消息,但在响应时,由于网络等原因超时了,导致生产者没有收到响应,那么生产者就会重复发送,因此有了重复消息。

  • 消费者拿到消息,消费完毕后,由于网络等原因超时了,导致Broker没有收到响应,导致Broker没更新offset,新消费者拿到刚才的消息,又消费了一遍,因此有了同一条消息被重复消费。

5.2 消费者幂等

对于生产者产生的重复消息,属于正常情况,为了保证消息不丢失,这是不可避免的。那么只能从消费者方面,去处理重复消息。

幂等:对于接口,同样的参数,调用一次和多次,产生的结果的相同的。

消费者也是一个接口,消费者幂等也相当于接口幂等,思路是相同的。

接口幂等的三个层面:接口外、接口内的业务逻辑、数据库。

  • 在消息进入接口之前,对消息中的唯一流水号加分布式锁,例如订单号,加锁中的消息不进行处理,防止同一时间对同一条消息同时处理。

  • 消息进入接口内的业务逻辑,查询此时的业务状态是否被执行过,是则不对消息进行处理,并给Broker返回响应。

  • 消息的业务逻辑执行完,结果保存于数据库,在数据库设置唯一键或唯一约束,或更新时加上状态条件,如where status='processing',如果违反了数据库的唯一设置,或者更新的条数为0,那么意味着这条消息已被处理过,事务回滚,并给Broker返回响应。

六、如何处理消息堆积

6.1 消息堆积的原因

  • 达到消费者消费能力上限,生产者的生产速度与消费者的消费速度不匹配。
  • 消息消费失败,反复去重试消费。

6.2 处理消息堆积

对于消费失败,则需要定位失败原因。
对于消费速度不匹配,可以优化业务逻辑,也可以水平扩容,增加消费者的数量。

七、消息中间件的推拉模式

有推模式和拉模式两种。

7.1 Producer与Broker

Producer与Broker默认是推模式,即Producer将消息推送至Broker。

不采用拉模式的原因:

  • 如果是拉模式,消息要保存在Producer,等待Broker去拉取,消息可靠需要Producer保证,生产者很多的话,很难保证每一个生产者都消息可靠。
  • Broker同一时间去拉取很多的Producer,对于Broker的性能是个考验。

7.2 Consumer与Broker

7.2.1 推模式

Broker将消息推向Consumer。

优点:Broker接受到消息,立马就推给Consumer,消息延迟低。

缺点:推送速度难以匹配消费速度,推送速度大于消费速度时,会超过消费者的承载上限。

7.2.2 拉模式

Consumer到Broker主动拉取消息。

优点:消费者可根据自己的消费速度去拉取消息。

缺点:消费者无法及时的知道消息是否到达Broker,只能间隔的拉取,消息延迟高。

7.2.3 长轮询

长轮询是对Consumer采用拉模式轮询的一种优化,Consumer去Broker拉取消息时,如果Broker没有消息,那么请求并不是立即返回,而是保持连接一段时间。
这段时间内,如果有消息到来,则可以返回消息,如果还是没有消息,则按超时处理。避免了Broker没有消息时,Consumer发出轮询,产生的无效请求。

八、消息事务

消息事务类似于分布式事务。

8.1 分布式事务

8.1.1 2PC

即两阶段提交,只适用于数据库层面的事务,分为准备阶段、提交阶段。

  • 准备阶段:协调者向各个参与者发送准备命令,各个参与者执行自己的事务(不提交),并向协调者发送执行结果。

  • 提交阶段:协调者接收到各个参与者的执行结果,如果都成功,则向各个协调者发送提交命令,如果其中有一个不成功,则向各个协调者发送回滚命令。

整个流程是同步阻塞的,各个参与者都需要互相等待,效率低。

8.1.2 TCC

即try-confirm-cancel,既适用与数据库层面,也适用于业务层面,如上传图片。

  • try:各个参与者预执行,并设置相应的预状态。
  • confirm:如果try阶段都成功,各个参与者就可以将预执行结果提交。
  • cancel:如果try阶段有一个失败了,那么执行回滚,将预执行结果设置为失败,表示后续不继续提交。

需要在业务代码中添加try-confirm-cancel的相关代码,对业务的耦合性比较大。

8.2 消息事务

消息事务是解决Producer与Broker之间的数据一致问题。

8.2.1 RocketMQ事务消息

  • Producer先发给Broker一个半消息,存在一个特殊队列,Consumer不可见。
  • 发完半消息之后,再执行本地事务,根据本地执行的结果,向Broker发送提交命令或回滚命令。
  • 如果执行结果成功,那么将半消息放入正常队列中。同时Producer需要提供接口,供Broker反查执行结果。


    1690510921661.png

九、如何设计一个消息中间件

明确消息中间件的几个角色:生产者,Broker,消费者,注册中心。

明确每个角色起到的作用:生产者负责生产消息,Broker负责存储消息,消费者负责处理消息,注册中心负责服务发现(生产者的发现,Broker的发现,消费者的发现)

明确消息的完整流程:生产者生产消息,发至Broker,Broker将消息存储,消费者从Broker获取消息进行处理。

明确实现要点:各个服务基于Netty通信,自定义消息协议,注册中心的选用或自己实现,分区理念,集群部署,选举算法保证消息队列的高可用,Broker利用本地文件系统和顺序写存储消息,根据消息队列的特性利用内存映射、零拷贝提升性能,长轮询。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容