RocketMQ Producer指定MessageQueue

背景描述

一个公司有100个门店,每个门店产生的订单都进入消息队列,Consumer 读取并处理。
如果其中某个门店的订单量特别大,这就会造成资源独占,Consumer 一直在处理这个门店的订单,其他门店的订单就得延迟较长时间才能被处理。

解决思路

给 Topic 设置100个 MessageQueue,把每个门店的订单写入一个 MessageQueue,consumer 默认是采取循环的方式逐个读取Topic中的每个MessageQueue,这样,即使某个店的订单量很大,也是这个店对应的 MessageQueue 消息量增大,不会造成其他店等待时间变长。

示例代码

public void selectorProducer() {
    DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
    producer.setNamesrvAddr(namesrvAddr);
    try {
        producer.start();

        for (int i = 0; i < 20; i++) {
            int orderId = i % 10;
            Message message = new Message("TopicTest", "push",
                    ("发送消息----" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
            System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        producer.shutdown();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,929评论 13 425
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,160评论 0 43
  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式...
    AI乔治阅读 2,219评论 2 5
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,593评论 0 34
  • 端午节,因为有事要处理,所以没有回老家。看到微信圈里大家都晒出在回家的路上,或者是与父母团聚的照片,满满的都是幸福...
    谷雨听蝶阅读 288评论 1 1

友情链接更多精彩内容