https://blog.csdn.net/HoneyYHQ9988/article/details/105941328
现在稍微大点的项目都会有用到消息队列中间件,因为消息队列有异步解耦、流量削峰、数据分发等许多好处。但是在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复。如果消息重复则会影响到我们正常的业务处理,这时就要对消息做幂等处理。最近,我所在项目中,就出现了消息重复被消费的问题,造成了对用户的过渡营销引起了投诉,于是我们项目立即召开会议讨论方案解决此问题。这里涉及到消息重复和幂等,下面先说下这两概念,然后再说下我们用到的解决方案。
1、消息重复的场景
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对生产者的确认应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当消费者给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker 重启以及消费者应用重启。当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
2、什么是幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
3、解决方案
上面我们讲了,既然在生产和消费过程中都有可能出现重复消费问题,那我们从消费的末端去处理,把识别出重复的消息,然后抛弃此消息,那不就能避免重复消息对业务的影响,这也就是幂等处理。行了, 下面说下我们项目中具体的实践方案。为了直观表达,我画了一张业务流程图:
1、生产者设置唯一标识的key值
既然后要能识别重复消息,那必须是此条消息有唯一的标识。到了这里,你肯定会想到用RocketMQ生成的MessageId不就可以了吗?但这不是最佳方法,因为 MessageId有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以MessageId作为处理依据。而最好的方式是以业务唯一标识作为幂等处理的关键依据(如订单ID)。
业务的唯一标识可以在生产者通过消息 Key 设置。实现代码如下:
Message message = new Message();
message.setKey("ORDERID_001");
SendResult sendResult = producer.send(message);
2、消费者识别key相同的重复消息
消费者收到消息时可以根据消息的 Key判断是否重复来实现消息幂等。这里我们用到了redis存放消息key值(因为redis读取快),并且对于key值大存放时长可以设置,超过了时长就会被清除掉。
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 Key 做幂等处理
}
});
这里我们是先判断key是否在redis中存在,如果存在则抛弃不走下面的业务逻辑;如果,在redis中没有查到,则继续下面的业务逻辑处理。