有哪些Java程序员必知必会的RocketMQ消息类型

一、普通消息

普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。

举个简单例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。

因为不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。

代码示例:

生产者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //声明并初始化一个producer
        //需要一个producer group名字作为构造方法的参数,这里为concurrent_producer
        DefaultMQProducer producer = new DefaultMQProducer("concurrent_producer");
        //设置NameServer地址,此处应改为实际NameServer地址,
        多个地址之间用;分隔
        //NameServer的地址必须有,但是也可以通过环境变量的
        方式设置,不一定非得写死在代码里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.
122 :9876");
        //调用start()方法启动一个producer实例
        producer.start();
        //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTestConcurrent",// topic
                "TagA",// tag
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                //调用producer的send()方法发送消息
                //这里调用的是同步的方式,所以会有返回结果,
                同时默认发送的也是普通消息
                SendResult sendResult = producer.send(msg);
                //打印返回结果,可以看到消息发送的状态以及一些相关信息
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        //发送完消息之后,调用shutdown()方法关闭producer
        producer.shutdown();
    }
}

消费者

public class Consumer {
    public static void main(String[] args) throws
    InterruptedException, MQClientException
    //声明并初始化一个consumer
    //需要一个consumer group名字作为构造方法的参数,
    这里为concurrent_consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer");
    //同样也要设置NameServer地址
    consumer.setNamesrvAddr("10.1.54.121:9876;10.1 .54 .122 : 9876");
    //这里设置的是一个consumer的消费策略
    //CONSUME_FROM_LAST_OFFSET
    默认策略,从该队列最尾开始消费,即跳过历史消息
    //CONSUME_FROM_FIRST_OFFSET
    从队列最开始开始消费,即历史消息(还储存在broker的)
    全部消费一遍
    //CONSUME_FROM_TIMESTAMP
    从某个时间点开始消费,和setConsumeTimestamp()配合使用,
    默认是半个小时以前
    consumer.setConsumeFromWhere(ConsumeFromWhere.
    CONSUME_FROM_FIRST_OFFSET);
    //设置consumer所订阅的Topic和Tag,*代表全部的Tag
    consumer.subscribe("TopicTestConcurrent", "*");
    //设置一个Listener,主要进行消息的逻辑处理
    //注意这里使用的是MessageListenerConcurrently这个接口
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            //返回消费状态
            //CONSUME_SUCCESS 消费成功
            //RECONSUME_LATER 消费失败,需要稍后重新消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    );
    //调用start()方法启动consumer
    consumer.start();
    System.out.println("Consumer Started.");
}
}

二、有序消息

有序消息就是按照一定的先后顺序的消息类型。

举个例子来说,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况。

那么有序消息是如何保证的呢?我们都知道消息首先由 producer 到 broker,再从 broker 到 consumer,分这两步走。那么要保证消息的有序,势必这两步都是要保证有序的,即要保证消息是按有序发送到 broker,broker 也是有序将消息投递给 consumer,两个条件必须同时满足,缺一不可。

进一步还可以将有序消息分成

  1. 全局有序消息
  2. 局部有序消息

之前我们讲过,topic 只是消息的逻辑分类,内部实现其实是由 queue 组成。当 producer 把消息发送到某个 topic 时,默认是会消息发送到具体的 queue 上。

1. 全局有序

举个例子,producer 发送 order id 为 1、2、3、4 的四条消息到 topicA 上,假设 topicA 的 queue 数为 3 个(queue0、queue1、queue2),那么消息的分布可能就是这种情况,id 为 1 的在 queue0,id 为 2 的在 queue1,id 为 3 的在 queue2,id 为 4 的在 queue0。同样的,consumer 消费时也是按 queue 去消费,这时候就可能出现先消费 1、4,再消费 2、3,和我们的预期不符。那么我们如何实现 1、2、3、4 的消费顺序呢?道理其实很简单,只需要把订单 topic 的 queue 数改为 1,如此一来,只要 producer 按照 1、2、3、4 的顺序去发送消息,那么 consumer 自然也就按照 1、2、3、4 的顺序去消费,这就是全局有序消息。

由于一个 topic 只有一个 queue ,即使我们有多个 producer 实例和 consumer 实例也很难提高消息吞吐量。就好比过独木桥,大家只能一个挨着一个过去,效率低下。

那么有没有吞吐量和有序之间折中的方案呢?其实是有的,就是局部有序消息。

2. 局部有序

我们知道订单消息可以再细分为订单创建、订单付款、订单完成等消息,这些消息都有相同的 order id。同时,也只有按照订单创建、订单付款、订单完成的顺序去消费才符合业务逻辑。但是不同 order id 的消息是可以并行的,不会影响到业务。这时候就常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。

由于一个 topic 可以有多个 queue,所以在性能比全局有序高得多。假设 queue 数是 n,理论上性能就是全局有序的 n 倍,当然 consumer 也要跟着增加才行。在实际情况中,这种局部有序消息是会比全局有序消息用的更多。

示例代码:

生产者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 声明并初始化一个producer
            // 需要一个producer group名字作为构造方法的参数,这里为ordered_producer
            DefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer");
            // 设置NameServer地址,此处应改为实际NameServer地址,
            多个地址之间用;分隔
            //NameServer的地址必须有,但是也可以通过环境变量的方式设置,
            不一定非得写死在代码里
            orderedProducer.setNamesrvAddr("10.1.54.121:
9876;10.1.54.122:9876");
            // 调用start()方法启动一个producer实例
            orderedProducer.start();
            // 自定义一个tag数组
            String[] tags = new String[]{"TagA", "TagB",
            "TagC", "TagD", "TagE"};
            // 发送10条消息到Topic为TopicTestOrdered,tag
            为tags数组按顺序取值,
            // key值为“KEY”拼接上i的值,消息内容为
            “Hello RocketMQ”拼接上i的值
            for (int i = 0; i < 10; i++) {
                int orderId = i % 10;
                Message msg =
                new Message("TopicTestOrdered", tags[i % tags.
                length], "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.
                DEFAULT_CHARSET));
                SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() {
                    // 选择发送消息的队列
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // arg的值其实就是orderId
                        Integer id = (Integer) arg;
                        // mqs是队列集合,也就是topic所对应的所有队列
                        int index = id % mqs.size();
                        // 这里根据前面的id对队列集合大小求余来返回所对应的队列
                        return mqs.get(index);
                    }
                }
                , orderId);
                System.out.println(sendResult);
            }
            orderedProducer.shutdown();
        }
        catch (MQClientException e) {
            e.printStackTrace();
        }
        catch (RemotingException e) {
            e.printStackTrace();
        }
        catch (MQBrokerException e) {
            e.printStackTrace();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

至于是要实现全局有序,还是局部有序,在此示例代码中,就取决于 TopicTestOrdered 这个 Topic 的队列数了。

消费者

public class Consumer {
    public static void main(String[] args) throws
    MQClientException {
        //声明并初始化一个consumer
        //需要一个consumer group名字作为构造方法的参数,
        这里为concurrent_consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer");
        //同样也要设置NameServer地址
        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54 .122:9876");
        //这里设置的是一个consumer的消费策略
        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,
        即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,
        即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,
        和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.
        CONSUME_FROM_FIRST_OFFSET);
        //设置consumer所订阅的Topic和Tag
        consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD");
        //设置一个Listener,主要进行消息的逻辑处理
        //注意这里使用的是MessageListenerOrderly这个接口
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage ( List <MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                //返回消费状态
                //SUCCESS 消费成功
                //SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
        );
        //调用start()方法启动consumer
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

三、延时消息

延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。

RcoketMQ的延时等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推。

这种消息一般适用于消息生产和消费之间有时间窗口要求的场景。比如说我们网购时,下单之后是有一个支付时间,超过这个时间未支付,系统就应该自动关闭该笔订单。那么在订单创建的时候就会就需要发送一条延时消息(延时15分钟)后投递给 consumer,consumer 接收消息后再对订单的支付状态进行判断是否关闭订单。

设置延时非常简单,只需要在Message设置对应的延时级别即可

Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                // 这里设置需要延时的等级即可
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);

# 链接 Java程序员福利"常用资料分享"

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容