RocketMQ的顺序消息

顺序消息指的是生产者投递消息的顺序与消息消费的顺序一致。某个Topic上所有消息都是顺序的称为全局顺序消息,如果是具有某个相同业务ID的一组消息保证其顺序性则成为部分顺序消息。

并发消费与顺序消费

当使用DefaultMQPushConsumer做消息消费时,需要通过registerMessageListener(MessageListener)方法来注册消息的监听器来编写如何处理消息的业务逻辑。MessageListener接口有两个子接口MessageListenerConcurrentlyMessageListenerOrderly

  • 并发消费,也就是MessageListenerConcurrently 不保证消息顺序,消费的顺序跟生产者投递顺序无关,消息消费最大并发度为消费者线程池的大小。
  • 全局顺序消费,只能指定该Topic只有一个queue
  • 部分顺序消费,生产投递时确保具有相同业务ID的消息发往相同的queue,消费侧多线程去从该Topic下的多个queue拉取进行消费,但MessageListenerOrderly会使用锁来确保多线程从同一个queue的消费处理是串行的。

实际场景中更多的是上面所说的部分顺序消费需求,下面来看看如何实现。

部分顺序代码实现

上文中提到了,部分顺序消息就是需要生产端与消费端配合,生产投递时使用MessageQueueSelector将相同业务ID的消息投递到同一个queue里,消费端使用MessageListenerOrderly使得多个消费者线程能够串行顺序的从queue中消费。部分顺序消息下、topic消息消费的最大并行度取决于该topic有多少个queue,每个queue的消费在多个线程中是串行的。

例子代码:

生产端:

/**
 * 顺序消息投递
 * 按照EventMessage中的msgId作为业务ID,
 * 业务ID对topic下queue数量取模选定投递哪个queue
 * 确保相同业务ID的消息都投递到同一个queue,从而使得该一组消息有序性
 * */
@Slf4j
@Component
public class OrderlyMsgProducer {
    
    @Value("${rocketmq.url}")
    private String mqurl;
    
    @Value("${rocketmq.accessKey}")
    private String accessKey;
    
    @Value("${rocketmq.secretKey}")
    private String secretKey;
    
    @Value("${rocketmq.producergroup.name}")
    private String producerGroupName;
    
    private DefaultMQProducer producer;
    
    public SendResult publishOrderly(EventMessage eventMsg) {
        try {
            Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), JSON.toJSONString(eventMsg).getBytes("utf-8"));
            SendResult result = producer.send(msg, 
                    new MessageQueueSelector() {

                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            for(MessageQueue queue : mqs) {
                                log.info(queue.toString());
                            }
                            log.info("Topic {} 共有{}个MessageQueue", eventMsg.getTopic(), mqs.size());
                            int id = Integer.parseInt((String)arg);
                            int mqNum = id % mqs.size();
                            log.info("业务ID={}消息投递到{}号Queue", id, mqNum);
                            return mqs.get(mqNum);
                        }}, 
                    eventMsg.getMsgId());
            return result;
        } catch (Exception e) {
            log.error("顺序消息发送失败" + e.getMessage(), e);
            e.printStackTrace();
        }
        return null;
    }
    
    @PostConstruct
    public void init() {
        producer = new DefaultMQProducer(producerGroupName + "-orderly", getAclRPCHook());
        producer.setNamesrvAddr(mqurl);
        try {
            producer.start();
            log.info("RocketMQ客户端顺序producer初始化...");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    
    @PreDestroy
    public void shutdown() {
        if(producer != null)
            producer.shutdown();
    }
    
    private RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

消费端:

@Slf4j
@Component
public class OrderlyConsumer {

    @Value("${rocketmq.url}")
    private String mqurl;

    @Value("${rocketmq.accessKey}")
    private String accessKey;

    @Value("${rocketmq.secretKey}")
    private String secretKey;

    @Value("${rocketmq.consumeThreadCorePoolSize:20}")
    private int consumeThreadCorePoolSize;

    @PostConstruct
    public void init() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testapp-orderlyconsumer-group", getAclRPCHook(),
                new AllocateMessageQueueAveragely());
        try {
            consumer.setNamesrvAddr(mqurl);
            consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
            consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
            consumer.setPullBatchSize(32); // 一次长轮询最多从mq里拿多少个消息,默认32
            consumer.setConsumeMessageBatchMaxSize(1);// 批量最多消费的消息、也即List<MessageExt> msgs的大小,默认1
            consumer.subscribe("test-orderly-topic", "douchuzi");
            
            //指定消息在同一个queue上串行顺序消费
            consumer.registerMessageListener(new MessageListenerOrderly() {

                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    MessageExt msgExt = msgs.get(0);
                    String msgContent;
                    try {
                        msgContent = new String(msgExt.getBody(), "UTF-8");
                        log.info("收到消息{}", msgContent);
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });

            consumer.start();
        } catch (Exception e) {
            log.error("注册消费者出错" + e.getMessage(), e);

        }
    }

    // Access Control List控制
    private RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
    }
}

测试:

for(int i=0; i<10; i++) {
    EventMessage eventMsg = new EventMessage();
    eventMsg.setTopic("test-orderly-topic");
    eventMsg.setTag("douchuzi");
    eventMsg.setMsgId("456");
    eventMsg.setProducerGroup("testapp-producer-group-orderly");
    eventMsg.setMsgBody("+" + i);
    eventMsg.setPublishTime(LocalDateTime.now().toString());
    
    SendResult result = orderlyMsgProducer.publishOrderly(eventMsg);
    
    log.info("消息以发送,消息ID:{}, 消息发送状态:{}", result.getMsgId(), result.getSendStatus());
}

执行结果:

2022-02-02 22:08:31.149  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=0]
2022-02-02 22:08:31.150  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=1]
2022-02-02 22:08:31.151  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=2]
2022-02-02 22:08:31.151  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=3]
2022-02-02 22:08:31.152  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : Topic test-orderly-topic 共有4个MessageQueue
2022-02-02 22:08:31.152  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : 业务ID=456消息投递到0号Queue
2022-02-02 22:08:33.543  INFO 13256 --- [nio-8080-exec-2] com.wangan.controller.WanganController   : 消息以发送,消息ID:7F00000133C84E0E2F2A09E6B0BE0000, 消息发送状态:SEND_OK
。。。。。。
。。。。。。
2022-02-02 22:08:33.757  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=0]
2022-02-02 22:08:33.757  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=1]
2022-02-02 22:08:33.757  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=2]
2022-02-02 22:08:33.757  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : MessageQueue [topic=test-orderly-topic, brokerName=broker-a, queueId=3]
2022-02-02 22:08:33.758  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : Topic test-orderly-topic 共有4个MessageQueue
2022-02-02 22:08:33.758  INFO 13256 --- [nio-8080-exec-2] c.w.c.orderly.OrderlyMsgProducer         : 业务ID=456消息投递到0号Queue
2022-02-02 22:08:33.774  INFO 13256 --- [nio-8080-exec-2] com.wangan.controller.WanganController   : 消息以发送,消息ID:7F00000133C84E0E2F2A09E6B1DE0009, 消息发送状态:SEND_OK
2022-02-02 22:08:51.130  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+0","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:31.088","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+1","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.544","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+2","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.576","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+3","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.597","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+4","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.628","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+5","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.658","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+6","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.681","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+7","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.704","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+8","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.727","tag":"douchuzi","topic":"test-orderly-topic"}
2022-02-02 22:08:51.137  INFO 13256 --- [MessageThread_1] c.w.controller.orderly.OrderlyConsumer   : 收到消息{"msgBody":"+9","msgId":"456","producerGroup":"testapp-producer-group-orderly","publishTime":"2022-02-02T22:08:33.756","tag":"douchuzi","topic":"test-orderly-topic"}

可见连续投递的10条消息最后在消费端是按照顺序消费处理的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 1、什么是顺序消息 顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由...
    冰河winner阅读 2,928评论 0 2
  • 背景: 业务使用 RocketMQ 的场景增多,但是有一些消息状态依赖的场景没有考虑顺序正确的使用RocketMQ...
    c934阅读 3,021评论 0 8
  • 顺序消息 之前我本地使用的client版本是3.6.2的,但是公司服务器上安得是3.2.6的版本。导致我测试顺序消...
    运维开发笔记阅读 2,246评论 0 3
  • RocketMQ是一款 分布式、队列模型的消息中间件,由阿里巴巴团队研发,借鉴参考了JMS规范的MQ实现,更参考了...
    微笑达人_4b5d阅读 2,872评论 0 0
  • 阿里help:https://help.aliyun.com/document_detail/49319.html...
    多关心老人阅读 1,702评论 0 0