以下解决方案都以rabbitmq为例
一、消息丢失
可能出现的场景1:
消息生产者和mq服务器之间的网络突然中断,即消息发送出去,由于网络问题没有抵达mq服务器
解决方案:
- 做好容错,加上try-catch,发送消息可能会网络失败,失败后要有重试机制,保证消息一定会发送出去,每个消息都可以做好日志记录(给数据库保存每一个消息的详细信息),每个消息状态是否都被服务器收到都应该记录
如下表:
messge_id 消息id
content text 消息的json字符串文本
to_exchange 消息要发送给哪个交换机
routing_key 用的路由键是什么
class_type 我们要将消息的内容json转化回哪个类
message_status 0新建 1已发送 2错误抵达 3已抵达 消息的状态
create_time
update_time - 如果消息没发送成功,定期去数据库扫描发送失败的消息进行重试发送
可能出现的场景2:
消息从生产者发送给mq服务器(broker),服务器拿到消息以后,首先要将消息持久化到一个地方,然后再由交换机交给指定的队列。因此broker要将消息写入磁盘(持久化)才算成功,如果broker没持久化完成就宕机就会造成消息丢失,只要消息存到这个队列里面了,那我们就可以放心了,这个队列里面确实就有消息了,就等人消费了
解决方案:
发送端publisher必须引入消息确认机制-可靠抵达
- publisher 的confirmCallback消息抵达broker的回调
- publisher 的returnCallback消息未投递到队列退回回调,只要消息没有投递给指定的队列,就会触发该失败回调
可能出现的场景3:
自动ack的状态下,消费者收到消息,但还没来得及消费就宕机了
解决方案:
开启手动ack,消费成功才移除,失败或者没来得及处理就noAck并重新入队
二、消息重复
可能出现场景1:
消息消费成功了,业务已经完成(事务已经提交),ack消息的时候,机器宕机,导致没有ack成功,broker的消息从unack变为ready,broker又重新发送给其他消费者,即其他消费者收到以后,再调用一遍业务方法,那这样等于同样一个业务消息,收到了2遍
可能出现场景2:
消息消费失败了,由于我们拒绝了消息自己让它重新入队,这是我们允许的,因为第一遍是失败了,第二遍回来可能就成功了
解决方案:
- 消费者的业务消费接口都应该设计为幂等的,已经处理过的不用再处理
- 使用防重表,因为每个消息都有一个唯一id,只要这个消息被处理过了,我们就可以在这个防重表里面记录一下,它第二次来,就不处理它了,其实跟我们解决方式1来写接口的幂等是一模一样的效果
- rabbitmq的话,每个消息都有一个redelivered字段(
message.getMessageProperties().getRedelivered()
),可以获取是否被重新投递过来的,而不是第一次投递过来的。
缺点:存在风险,万一这个消息重新派送过来,是因为上一次没处理成功导致的
三、消息积压
如果是消息队列中的消息太多,肯定会影响mq很多性能,即会导致mq性能的下降,因此我们一定要解决消息挤压的问题
可能出现的场景:
- 消费者宕机导致的积压,因为消费者宕机了,但是消息生产者还源源不断的生产消息,那就会导致队列中的消息积压
- 消费者消费能力不足导致的积压
- 消息生产者发送流量过大
解决方案:
- 上线更多的消费者,进行正常消费
- 如果由于业务量太大,可能太慢了,那就可以上线一个专门的处理消息的消费者,那这个消费者有百万积压,从队列中批量拿出来,存储到数据库,这样百万处理也很快,可能数据库1min/2min把百万的消息都存储完了,那这样消息队列里面的消息空了,那你消息队列的性能就不影响了,然后我们再自己来编写一个离线处理业务,从数据库里面慢慢的取出一条一条去来处理这个消息,去来执行相关的业务逻辑