「中间件」RocketMQ解决消息顺序和重复性消费问题整理

前言

现在越来越多的产品采用的是分布式架构,部署的时候也同样是分布式部署,那么各个应用间的异步通信大多选择消息中间件MQ来处理,那么就回避不了两个问题:

1. 发送消息的顺序性
2. 消息被重复消费

目前在生产环境,使用较多的消息队列中间件有ActiveMQ,RabbitMQ,Kafka,RocketMQ等,本文的设计是以RocketMQ为例来解决这两个问题。

一、发送消息的顺序性

1、 什么是顺序消息?

顺序消息即有序消息,发送者(Producer)按照顺序发送消息,消费者(Consumer)按照消息的发送顺序进行消费。例如:我们在某宝购买一款笔记本电脑,需要下单、支付和订单完成这3个流程,相对应的产生3条消息,分别是创建订单——订单支付——订单完成,为了保证业务的完整性肯定需要按照这个顺序依次消费才能达到预期目的。

2、 第一种模型

但是在生产环境MQ肯定是集群部署,例如多Master模式、多Master多Slave模式(异步复制)、多Master多Slave模式(同步双写)等模式。为了保证消息的顺序模型可能是这样的:

第一种模型

M1:创建订单、M2:订单支付、M3:订单完成
由于MQ Service是集群部署,假设M1发送到MQ Service1,M2发送到MQ Service2,依次类推。如果要保证M1最先被消费,那么需要M1到达消费端被消费后,通知MQ Service2,然后MQ Service2再将M2发送到消费端,M2被消费后,再通知MQ Service3,将M3发送到消费端。

问题:三条消息分别发送到三台或者其中两台Service上,就很难保证M1第一个到达MQ集群,也不能保证最先被消费。加入M2、M3其中任意一个优先于M1到达MQ集群,并且优先于M1被消费,那么就没有顺序可言了。综合分析这个架构模式并不能保证消息被MQ顺序消费。

3、 第二种模型

基于第一种模型分析来看要想保证M1、M2、M3能够顺序消费,首先要保证能够顺序发送到同一个MQ Service中,改进后模型如下:

第二种模型

如上图所示,将三条消息按照顺序发送到同一个MQ Service中,基于先到先被消费原则,依次消费的顺序为M1 > M2 > M3,这样就保证了消息的顺序性。
如果使用这种设计在正常情况下是没问题的,但是在实际场景中很可能会遇到下面的问题:

网络延迟问题

生产者、MQ集群和消费者不可能发布在同一台服务器中,那么消息在传输过程中就会遇到网络延迟问题。如上图所示,M1和M2在发送给“消费者1”的过程中遇到了延迟问题,M3先于M1和M2被消费,那问题又回到了原点,这种方案依然不能解决消息被顺序消费的问题。

4、 第三种模型

第二种模型宣告失败,接着分析,我们保证了生产者将3个消息按照顺序发送给同一个 MQ Service这个逻辑是没问题的,那么为了解决上方网络延迟问题,那我们就把3个消息发送给同一个“消费者”被消费呢?即使遇到网络问题或者消费者响应问题,M1被消费失败,为了保证消息一定会被消费,肯定会选择重发消息到另一个“消费者”端,如下图所示:

第三种模型

如上图所示,将3条消息发送给“消费者1”,M1被消费的时候遇到问题,没有被消费成功,那么会将消息发送给“消费者2”进行重试,这样就保证了消息的顺序性。
但是可能会遇到另一个问题,“消费者1”没有响应有两种情况,一种是M1在网络传输过程中丢失,另一种是“消费者1”已经消费成功了但是返回的响应信息没有被MQ Service收到。如果是第二种情况重发M1给“消费者2”就会造成M1被重复消费,也就引发了文章开头的第二个消息重复消费问题。
我们总结一下要保证消息严格的按照顺序消费,最可行的办法就是:

保证生产者 —— MQService —— 消费者 是一对一对一的关系

5、 MessageQueueSelector实现顺序发送和消费

上述办法虽然可行性最高,但是也存在更加严重的问题,例如:

1. 并行度就会成为消息系统的瓶颈(吞吐量不够)
2. 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

看到这里挠挠头又掉了一大把头发,过度设计将会造成效率低下,甚至浪费更多的资源。换种思路,从业务角度来看,保证消息的顺序性不仅仅是依靠消息系统,那就寻找更加合理的方式来解决。
RocketMQ本身具有发送顺序消息功能,那么通过源码角度来分析一下:

// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

备注:send方法带有参数MessageQueueSelector,MessageQueueSelector是让用户自己决定消息发送到哪一个队列,如果是局部消息的话,用来决定消息与队列的对应关系。

6、 源码示例

接下来我们就用代码模拟一下MessageQueueSelector如何使用。
1、 创建一个生产者(Producer)

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for(int i = 0;i < 6 ;i++){
            int orderId=(int)((Math.random()*9+1)*10000000);
            for(int j = 0;j < 3 ;j++){
                Message msg = new Message("AAA","TagA",("推送的订单ID为="+orderId).getBytes());
                try {
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    },orderId);
                    System.out.println(sendResult);
                } catch (RemotingException e) {
                    e.printStackTrace();
                } catch (MQBrokerException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上述生产者代码中我们看到在发送消息的时候使用了两个for循环来模拟场景,第一个for循环是产生6个订单,按照MQ的负载策略6个订单将分别发送到不同的消费者端。第二个for循环是每个订单里面产生3条有序的订单消息(M1、M2、M3),订单id是随机生成不重复的9位数字(生产场景使用不同的规则)。

2、 创建两个消费者(Consumer)
消费者1:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("AAA","TagA");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list){
                    try {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("接收到消息1:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer1 Started.");
    }
}

消费者2:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;
public class Consumer1 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("AAA","TagA");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list){
                    try {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("接收到消息2:topic:"+topic+",tags:"+tags+",msg:"+msgBody);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer1 Started.");
    }
}

3、 运行测试
先启动两个消费者,启动成功后再启动生产者,运行结果如下:
生产者执行信息

SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0061910000, offsetMsgId=3452373400002A9F000000000006325B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=397]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00621C0001, offsetMsgId=3452373400002A9F0000000000063315, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=398]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062610002, offsetMsgId=3452373400002A9F00000000000633CF, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=3], queueOffset=399]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062A00003, offsetMsgId=3452373400002A9F0000000000063489, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=919]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0062DF0004, offsetMsgId=3452373400002A9F0000000000063543, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=920]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00631F0005, offsetMsgId=3452373400002A9F00000000000635FD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=921]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00635F0006, offsetMsgId=3452373400002A9F00000000000636B7, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=348]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B00639E0007, offsetMsgId=3452373400002A9F0000000000063771, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=349]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0063DE0008, offsetMsgId=3452373400002A9F000000000006382B, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=1], queueOffset=350]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064210009, offsetMsgId=3452373400002A9F00000000000638E5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=327]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006462000A, offsetMsgId=3452373400002A9F000000000006399F, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=328]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064A2000B, offsetMsgId=3452373400002A9F0000000000063A59, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=329]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0064E5000C, offsetMsgId=3452373400002A9F0000000000063B13, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=330]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006525000D, offsetMsgId=3452373400002A9F0000000000063BCD, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=331]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B006565000E, offsetMsgId=3452373400002A9F0000000000063C87, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=2], queueOffset=332]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065A7000F, offsetMsgId=3452373400002A9F0000000000063D41, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=922]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0065E60010, offsetMsgId=3452373400002A9F0000000000063DFB, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=923]
SendResult [sendStatus=SEND_OK, msgId=C0A81FD5467118B4AAC22B0066250011, offsetMsgId=3452373400002A9F0000000000063EB5, messageQueue=MessageQueue [topic=AAA, brokerName=broker-a, queueId=0], queueOffset=924]

一共产生了6组数据,每组3条消息,共发送了18条消息。

消费者1执行信息:

接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=890566068
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=944070249
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420
接收到消息1:topic:AAA,tags:TagA,msg:推送的订单ID为=252781420

消费者1接收到了3组消息,共9条信息。

消费者2执行信息:

接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=171730631
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=202312726
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694
接收到消息2:topic:AAA,tags:TagA,msg:推送的订单ID为=999804694

消费者1接收到了3组消息,共9条信息。

备注:通过运行结果可以看出生产者发送的消息会根据MessageQueueSelector实现的算法来选择一个队列,那么相同的策orderId消息自然是发送到了同一个队列,那么在消费的时候也会被一起消费。

二、消息被重复消费

在上面设计订单顺序消费的时候有抛出一个新的问题,就是消息重复。但是熟读RocketMQ API的朋友应该都知道,RocketMQ是不提供重复消费问题的解决方案。那么先来了解一下什么是重复性消费:

1、 什么是消息重复消费?

在网络不可达的情况下,只要通过网络今夕数据交换,就不可避免的产生同一条消息被不同两个或两个以上的消费者消费。

2、 解决方案

如果消费端收到两条一样的信息,怎么处理呢?

  1. 消费端处理消息的业务逻辑保持幂等性
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条的原理是,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的msgId,如果新到的msgId已经在日志表中,那么就不再处理这条消息。
第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

三、总结

要实现顺序消费根据文中的方案就可以解决掉这个问题,但是在实际应用场景中肯定不会这么简单。跟其他朋友在聊这个话题的时候,他们还采用了其他的解决方案,比如有先后业务逻辑耦合的消息不通过MQ实现,通过业务代码实现,吞吐瓶颈可以通过多线程解决,当然体量过大还是需要MQ的。
由于RocketMQ并不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重,这样问题始终在可控范围内。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容