RocketMQ-基础使用(二)

前置文章:
RocketMQ-基础使用(一),该文主要涉及MQ基础、RocketMQ安装&集群搭建、RocketMQ监控平台。

官方基础使用样例,很多基础内容其实官方文档都有很详细的说明。日常使用如果时间充足,还是推荐查看官方文档。学习官方文档是一个良好的习惯。

零、本文纲要

一、RocketMQ-基础使用

  1. 前置文章基础指令

二、RocketMQ-发送消息

  1. 发送同步消息
  2. 发送异步消息
  3. 发送单向消息

三、RocketMQ-接收消息

  1. 消息接收
  2. 消息接收-负载均衡【默认】
  3. 消息接收-广播模式

四、RocketMQ-消息类型

  1. 顺序消息
  2. 延迟消息
  3. 批量消息
  4. 过滤消息
  5. 事务消息

一、RocketMQ-基础使用

0. 前置文章基础指令

Ⅰ 启动RocketMQ的基础指令

# Start Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

# Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

指定自定义配置文件启动nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Ⅱ 关闭RocketMQ的基础指令

# Shutdown Servers
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

二、RocketMQ-发送消息

发送同步消息 / 发送异步消息 / 发送单向消息

1. 发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

  • ① 基础依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>
  • ② 同步消息代码
/**
 * 发送同步消息
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag1", ("Hello RocketMQ [" + i + "]").getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            System.out.printf("发送状态:%s,消息ID:%s,队列:%d%n", status, msgId, queueId);
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

截取控制台输出
发送状态:SEND_OK,消息ID:C0A8026AC05818B4AAC28D428B270000,队列:3

2. 发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

/**
 * 发送异步消息
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag2", ("AsyncMsg [" + i + "]").getBytes());
            //5.发送异步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 发送成功的回调函数
                 * @param sendResult 发送结果
                 */
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:" + sendResult);
                }

                /**
                 * 发送失败的回调函数
                 * @param throwable 发送异常
                 */
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送异常:" + throwable);
                }
            });
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

与同步消息的不同之处在于通过回调函数来获取发送结果。

3. 发送单向消息

这种方式主要用在不特别关心发送结果的场景,比如:日志发送。

/**
 * 发送单向消息
 */
public class OneWayProducer {

    public static void main(String[] args) throws Exception, MQBrokerException {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "tag3", ("OneWayMsg [" + i + "]").getBytes());
            //5.发送单向消息
            producer.send(msg);
            //线程睡1秒
            Thread.sleep(1000);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

三、RocketMQ-接收消息

1. 消息接收

/**
 * 消息的接受者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("base", "tag1");
        //设定消费模式:负载均衡|广播模式

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接收消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动消费者consumer
        consumer.start();
    }
}

2. 消息接收-负载均衡【默认】

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。【默认的消息消费方式】

consumer.setMessageModel(MessageModel.CLUSTERING);

3. 消息接收-广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

consumer.setMessageModel(MessageModel.BROADCASTING);

四、RocketMQ-消息类型

1. 顺序消息

  • ① 基础分析

假定一个订单的顺序流程是:创建、付款、推送、完成。有张三、李四两人进行订单业务。

a、全局有序:
张三所有消息消费完,再消费李四消息,且内部有序;
一个Borker,一个MessageQueue;

b、局部有序:
只要保证各自消息内部的有序消费,交替消费两者的消息是可以的;
一个Borker,多个MessageQueue,一个MessageQueue对应一个订单。

所以,一般仅需保证局部有序即可。
实现方式:同一个用户的一个业务消息放到同一个队列,比如:订单号相同的消息进同一个队列。

  • ② 代码实现

Ⅰ 消息生产者producer的核心代码

/**
 * 参数一: 消息对象
 * 参数二: 消息队列选择器 MessageQueueSelector
 * 参数三: 选择队列业务标识,此处为订单ID
 */
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    /**
     *
     * @param list      消息队列
     * @param message   消息对象
     * @param o         业务标识的参数
     * @return 消息队列
     */
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
        long orderId = (long) o;
        long index = orderId % list.size(); //订单ID一致,则取模结果一致,最终选择的队列一致
        return list.get((int) index);
    }
}, order.getOrderId());

Ⅱ 消息消费者consumer的核心代码

此处是通过有序消息监听MessageListenerOrderly来实现的

//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (MessageExt messageExt : list) {
            System.out.println("线程名称:" + Thread.currentThread().getName() + " → " +
                    "消费消息:" + new String(messageExt.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

2. 延迟消息

使用场景:比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

延迟消息使用限制:

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Ⅰ 消息生产者producer的核心代码

msg.setDelayTimeLevel(2);

Ⅱ 消息消费者consumer的核心代码

无需调整。

注意:受限于网络情况,实际的延迟往往大于设置的延迟。

3. 批量消息

批量发送消息能显著提高传递小消息的性能。

限制:
a、相同的topic;
b、相同的waitStoreMsgOK;
c、不能是延时消息;
d、总大小不应超过4MB。

Ⅰ 消息生产者producer的核心代码

List<Message> messageList = new ArrayList<>();

Ⅱ 消息消费者consumer的核心代码

无需调整。

4. 过滤消息

一般过滤消息可通过 TAG / SQL92标准 来进行过滤

Ⅰ 消息生产者producer的核心代码

//方式一:通过Tag过滤的使用方法,消息发送方不做调整
//...

//方式二:通过sql过滤的使用方法,使用putUserProperty设置一些消息属性
msg.putUserProperty("a", String.valueOf(i));

Ⅱ 消息消费者consumer的核心代码

//方式一:通过Tag过滤的使用方法,consumer使用" || "分隔订阅不同的Tag即可
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

//方式二:通过sql过滤的使用方法,通过MessageSelector消息选择器的bySql方法过滤消息
consumer.subscribe("topic_sql_filter", MessageSelector.bySql("num > 5"));

方式二如果报错:The broker does not support consumer to filter message by SQL92
则需要在我们对应的Broker配置文件内做调整,添加enablePropertyFilter=true,重启服务即可生效。

关于SQL92基础语法,RocketMQ只定义了一些基本语法来支持这个特性:
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;

常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE

注意:只有使用push模式的消费者才能用使用SQL92标准的sql语句。

5. 事务消息

在【分布式事务-可靠消息最终一致性】的解决方案内使用的就是事务消息。

事务消息流程:正常事务消息的发送及提交,以及事务消息的补偿【事务状态回查】;

事务状态:
LocalTransactionState.COMMIT_MESSAGE 提交状态 允许消费消息;
LocalTransactionState.ROLLBACK_MESSAGE 回滚状态 删除消息,不允许被消费;
LocalTransactionState.UNKNOW 中间状态 需要回查事务。

Ⅰ 消息生产者producer的核心代码

/**
 * 发送同步消息
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        //2.指定Nameserver地址,多个NameServer则用“;”隔开
        producer.setNamesrvAddr("192.168.253.128:9876");
        //3.设置消息事务的监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地的事务
             * @param message 消息
             * @param o
             * @return 事务状态
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                String messageTags = message.getTags();
                if (StringUtils.equals("TagA", messageTags)) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TagB", messageTags)) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    return LocalTransactionState.UNKNOW;
                }
            }

            /**
             * 该方法进行MQ事务状态的回查
             * @param messageExt 消息
             * @return 事务状态
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息的Tag:" + messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        //4.启动producer
        producer.start();
        for (int i = 0; i < 3; i++) {
            //5.创建消息对象,指定Topic、Tag、消息体
            Message msg = new Message("topic_transaction", tags[i],
                    (tags[i] + " Hello transactionMsg " + i).getBytes(StandardCharsets.UTF_8));
            //6.发送消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println("发送结果:" + sendResult);
            Thread.sleep(1000);
        }
        //7.关闭生产者producer,此处需要回查,所以不关闭
        //producer.shutdown();
    }
}

Ⅱ 消息消费者consumer的核心代码

/**
 * 消息的接受者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.253.128:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("topic_transaction", "*");
        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                            ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("消费消息:" + new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
        System.out.println("消费者启动了...");
    }
}

注意:事务消息不支持延时消息和批量消息。

五、结尾

以上即为RocketMQ-基础使用(二)的全部内容,感谢阅读。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,753评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,668评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,090评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,010评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,054评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,806评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,484评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,380评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,873评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,021评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,158评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,838评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,499评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,044评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,159评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,449评论 3 374
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,136评论 2 356

推荐阅读更多精彩内容