一、为什么使用 MQ?
1.1 解耦
1.1.1 解耦1
例如电商系统核心是交易服务,交易服务要调用另外三个服务,订单服务、库存服务、仓储服务。
这三个服务如果有一个服务不可用,交易服务就无法正常运行,所以交易服务是强耦合另外三个服务。
引入MQ之后,交易服务只跟MQ交互,把消息发到MQ里面就行了,无需关心另外三个服务是否可用。这时候交易服务跟另外三个服务就是弱耦合的关系,耦合性被降低了。
哪怕是另外三个服务暂时不可用,也不影响交易服务的运行,只要其他服务运行起来后,把MQ里面的消息消费了就行。(降级接口)
1.1.2 解耦2
假设 A 系统在用户发生某个操作的时候,需要把用户提交的数据同时推送到 B、C 两个系统的时候。这个时候负责 A 系统的哥们想:没事啊,B、C 两个系统给我提供一个 HTTP 接口或者 RPC 接口,我把数据推送过去不就完事了嘛,负责 A 系统的哥们美滋滋。
一切看起来很美好,但是随着业务快速迭代,这个时候系统 D 也想要这个数据。那既然这样,A 系统的开发同学就改咯,在发送数据给 B、C 的同时加上一个 D。但是,越到后面越发现,麻烦来了。整个系统好像不止这个数据要发送给 B、C、D、还有第二、第三个数据要发送给 B、C、D。甚至有时候又加入了 E、F 等系统,他们也要这个数据。并且有时候可能 B 系统突然又不要这个数据了,A 系统改来改去,A 系统的开发哥们头皮发麻。更复杂的场景是,数据通过接口传给其他系统有时候还要考虑重试、超时等一些异常情况。
这个时候,就该我们的 MQ 粉墨登场了,这种情况下使用 MQ 来解耦是再合适不过了,因为负责 A 系统的哥们只需要把消息扔到 MQ 就行了,其他系统按需来订阅消息就好了。就算某个系统不需要这个数据了,也不会需要 A 系统改动代码。
1.2 异步
没有引入MQ的时候,交易服务需要同步调用三个服务,如果调用一个服务需要耗时1秒,那么同步调用三个服务需要耗时3秒。在引入MQ之后,全都改成了异步调用,整个耗时不到1秒,大大提高了接口的性能。
1.3 削峰
如果一秒内同时来了5000笔交易,而订单服务每秒只能处理100笔交易,那么后面的4900笔交易失败。在引入MQ之后,交易服务可以把交易数据先发送到MQ中,而订单服务再慢慢从MQ拉取交易信息处理。从而避免突发流量压垮服务器。
1.3.1 削峰填谷
举个例子,比如我们的订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒 1000 左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就 100 多个,但是在高峰期时候,并发量会突然激增到 5000 以上,这个时候数据库肯定死了。
但是使用了 MQ 之后,情况就变了,消息被 MQ 保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒 1000 个数据,这样慢慢写入数据库,这样就不会打死数据库了。
至于为什么叫做削峰填谷呢?如果没有用 MQ 的情况下,并发量高峰期的时候是有一个“顶峰”的,然后高峰期过后又是一个低并发的“谷”。但是使用了 MQ 之后,限制消费消息的速度为 1000QPS,但是这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在 1000QPS,直到消费完积压的消息,这就叫做“填谷”。
二、引入MQ之后的问题
2.1 系统可用性降低
本来整个系统有四个服务,我们只需要保证这四个服务可用就行了。现在又多引入了一个MQ,我们还要保证MQ的可用,所以整个系统的可用性降低。
2.2 系统复杂性提高
本来交易服务是同步调用另外三个服务,如果另外三个服务不可用,交易服务能立即感知到。引入MQ之后,整个系统的稳定性就要靠MQ保证了。
这时候,我们就要考虑到发到MQ里面的消息怎么避免丢失的问题?顺序性消费的问题,就是同一笔交易的下单消息应该比撤单消息先处理。重复性消费的问题,就是同一笔下单交易的消息可能被多次处理。
当然,每种问题都有具体的解决方案,避免消息丢失可以使用MQ集群,顺序性消费可以把消息发到同一个分区,重复性消费可以在消费端做幂等性处理。
2.3 重复消费问题
2.3.1 问题场景
重复消费问题可以说是 MQ 中普遍存在的问题, 不管你用哪种 MQ 都无法避免。有哪些场景会出现重复的消息呢?
- 消息生产者产生了重复的消息;
- Kafka 和 RocketMQ 的 offset 被回调了;
- 消息消费者确认失败;
- 消息消费者确认时超时;
- 业务系统主动发起重试。
如果重复消息不做正确的处理,会对业务造成很大的影响,产生重复数据或者导致数据异常,比如会员系统多开通了一个月的会员等。
2.3.2 解决方案
不管是由于生产者产生的重复消息,还是由于消费者导致的重复消息,我们都可以在消费者中解决这个问题。
这就要求消费者在做业务处理时,要做幂等设计。在这里我推荐增加一张消费消息表,来解决 MQ的这类问题。
消费消息表中,使用 messageId 做唯一索引。在处理业务逻辑之前,先根据 messageId 查询一下该消息有没有处理过。如果已经处理过了则直接返回成功,如果没有处理过,则继续做业务处理。
补充:RocketMQ消费过程幂等
以下内容来自RocketMQ官网:
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
2.4 数据一致性问题(异步分布式事务问题)
2.4.1 问题场景
当服务间是同步调用的时候,我们还可以使用本地事务来控制数据的一致性。但是引入MQ之后,服务间的调用都是异步了,就没办法使用本地事务,也就无法做到数据的强一致性了。
例如,调用订单服务下单成功了,但是调用库存服务扣减库存失败,就会导致超卖,是严重的线上事故。
这时候怎么办?
方案一:需要事务强一致的,不用消息异步,如下单、减库存要放在一个事务里控制,加积分这种非核心的业务才用消息异步处理。
方案二:可以使用MQ事务消息(只有RocketMQ才有事务消息功能,RocketMQ收发事务消息)。
事务状态有以下三种:
- TransactionStatus.CommitTransaction:提交事务,允许订阅方消费该消息。
- TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.Unknow:无法判断状态,期待消息队列RocketMQ版的Broker向发送方再次询问该消息对应的本地事务的状态。
步骤一: A 服务向消息中间件发布消息
在服务A处理任务A前,首先向消息中间件发送一条半信息。
消息中间件收到后将该消息持久化,但不进行投递。持久化成功后,向A服务返回确认应答。
服务A收到确认应答后,便可以开始处理任务A。
任务A处理完成后,服务A便会向消息中间件发送Commit 或者 Rollback 请求,该请求发送完成后,服务A的工作任务就结束了,该事务的处理过程也就结束了。
在消息中间件收到 Commit 后,便会向 B 服务投递消息,如果收到 Rollback 便会直接丢弃消息。
如果消息中间件在最后的过程中,长时间没有收到服务A 发送的 Commit 或 Rollback 指令,这个时候就需要依靠 超时询问机制。
步骤二: 消息中间件向B服务投递消息
消息中间件收到A服务的提交 Commit指令后便会将该消息投递给B服务,然后将自己的状态置为阻塞等待状态。B服务收到消息中间件发送的消息后便开始处理任务B,处理完成后便会向消息中间件发出回应。但是在消息中间件阻塞等待的时候同样会出现问题。
- 正常情况:消息中间件投递完消息后,进入阻塞等待状态,在收到确认应答后便认为事务处理完成,该流程结束。
- 等待超时情况:在等待确认应答超时之后就会重新进行投递,直到B服务器返回消费成功响应为止。而消息重试的次数和时间间隔都可以设置,如果最终还是不能成功进行投递,则需要人工干预。
2.4.2 解决方案
我们都知道数据一致性分为:强一致性、弱一致性、最终一致性。
而 MQ 为了性能考虑使用的是最终一致性,那么必定会出现数据不一致的问题。这类问题大概率是因为消费者读取消息后,业务逻辑处理失败导致的。这时候可以增加重试机制。重试分为同步重试和异步重试。
有些消息量比较小的业务场景,可以采用同步重试。在消费消息时如果处理失败,立刻重试 3-5 次,如果还是失败则写入到记录表中。但如果消息量比较大,则不建议使用这种方式。因为如果出现网络异常,可能会导致大量的消息不断重试,影响消息读取速度造成消息堆积。
消息量比较大的业务场景,建议采用异步重试。在消费者处理失败之后,立刻写入重试表,有个 job(如采用xxljob) 专门定时重试。
还有一种做法:如果消费失败,自己给同一个 topic 发一条消息。在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式。
2.5 消息丢失问题
2.5.1 问题场景
同样消息丢失问题,也是 MQ 中普遍存在的问题,不管你用哪种 MQ 都 无法避免。有哪些 场景会出现消息丢失问题呢?
- 生产者产生消息时,由于网络原因发送到 MQ 失败了;
- MQ 服务器持久化,存储磁盘时出现异常;
- Kafka和RocketMQ 的 offset 被回调时,略过了很多消息;
- 消费者刚读取消息,已经 ACK 确认,但业务还没处理完,服务就被重启了。
导致消息丢失问题的原因挺多的, 生产者、 MQ 服务器、 消费者都有可能产生问题。我在这里就不一一列举了。最终的结果会导致消费者无法正确的处理消息,而导致数据不一致的情况。
2.5.2 解决方案
不管你是否承认,有时候消息真的会丢。即使这种概率非常小,也会对业务有影响。生产者、MQ 服务器、消费者都有可能会导致消息丢失的问题。为了解决这个问题,我们可以增加一张消息发送表。
当生产者发完消息之后,会往该表中写入一条数据,状态 status 标记为待确认;
如果消费者读取消息之后,调用生产者的 API 更新该消息的status为已确认;
有个job(xxljob) 每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息。
这样不管是由于生产者、 MQ服务器、还是消费者导致的消息丢失问题,job 都会重新发消息。
2.6 消息顺序问题
2.6.1 问题场景
有些业务数据是有状态的,比如订单有下单、支付、完成、退货等状态。 如果订单数据作为消息体,就会涉及顺序问题了。
例如消费者收到同一个订单的两条消息。第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了。没有下单就先支付了?
消息顺序问题是一个非常棘手的问题,比如:
Kafka 同一个 partition 中能保证顺序,但是不同的 partition 无法保证顺序;
RabbitMQ的同一个queue能够保证顺序,但是如果多个消费者同一个queue 也会有顺序问题。
如果消费者使用多线程消费消息,也无法保证顺序。
如果消费消息时同一个订单的多条消息中,中间的一条消息出现异常情况,顺序将会被打乱。
还有如果生产者发送到 MQ中的路由规则,跟消费者不一样,也无法保证顺序。
2.6.2 解决方案
消息顺序问题是一种常见问题。我们以 Kafka 消费订单消息为例,订单有下单、 支付、 完成、 退货等状态。这些状态是有先后顺序的,如果顺序错了会导致业务异常。
解决这类问题之前,我们需要先确认:消费者是否真的需要知道中间状态,只知道最终状态行不行?
其实很多时候,我真的需要知道的是最终状态。这时可以把流程优化一下:
这种方式可以解决大部分的消息顺序问题。
但如果真的有需要保证消息顺序的需求,那么可以将订单号路由到不同的 partition。同一个订单号的消息,每次到发到同一个partition。
2.7 消息堆积
2.7.1 问题场景
如果消息消费者读取消息的速度,能够跟上消息生产者的节奏,那么整套 MQ 机制就能发挥最大作用。
但是很多时候,由于某些批处理或者其他原因,导致消费速度小于生产速度。这样会直接导致消息堆积问题,从而影响业务功能。
这里以下单 开通会员为例,如果消息出现堆积会导致用户下单之后,很久之后才能变成会员。这种情况肯定会引起大量用户投诉。
2.7.2 解决方案
那么消息堆积问题该如何解决呢?这个要看消息是否需要保证顺序。如果不需要保证顺序,可以读取消息之后用多线程处理业务逻辑。
这样就能增加业务逻辑处理速度,解决消息堆积问题。但是线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。
如果需要保证顺序,可以读取消息之后将消息按照一定的规则分发到多个队列中,然后在队列中用单线程处理。
资料来源:
面试官竟然问我为啥要用MQ,幸亏我看了参考答案