RocketMQ 消息的类型

参考资料

阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/

阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html

阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo

消息的类型

http://rocketmq.apache.org/docs/simple-example/

按照发送的特点分:
https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.2.13.496e2379jnaAlt

  1. 同步消息
  2. 异步消息
  3. 单向消息

按照使用功能特点分:

  1. 普通消息(订阅)
  2. 顺序消息
  3. 广播消息
  4. 延时消息
  5. 批量消息
  6. 事务消息

同步发送(可靠)

  1. 同步发送,线程阻塞,投递completes阻塞结束
  2. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
  3. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
//Send message in synchronous mode. This method returns only when the sending procedure totally completes.
SendResult sendResult = producer.send(msg);
  1. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  1. retry的实现原理:只有ack的SendStatus=SEND_OK才会停止retry
 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  for (; times < timesTotal; times++) {
        trySend();
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
        switch (communicationMode) {
                case ASYNC:
                    return null;
                case ONEWAY:
                    return null;
                case SYNC:
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    return sendResult;
                default:
                    break;
            }
    }

注意事项:发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了

异步发送(可靠)

  1. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
  2. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  1. 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            System.out.printf("%-10d Exception %s %n", index, e);
            e.printStackTrace();
        }
    });               

Oneway(不可靠,类似于UPD)

  1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
  2. 此方式发送消息的过程耗时非常短,一般在微秒级别
    for (int i = 0; i < 50; i++) {
         Message msg = new Message("TopicTest2" ,"TagA" ,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         producer.sendOneway(msg);
    }

延迟发送

艿艿的博客
https://blog.csdn.net/github_38592071/article/details/72230984
延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s

 msg.setDelayTimeLevel(2);
 SendResult sendResult = producer.send(msg);

实现原理:

  1. 发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC这个Topic里面
  2. 根据DelayTimeLevel选择对应的queue
  3. 再把真实的topic和queue信息封装起来,set到msg里面
  4. 然后每个SCHEDULE_TOPIC_XXXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息
  5. 每 10s 定时持久化发送进度

源码参考 :rocketmq-store.4.4.0.jar
CommitLog.putMessage()

  1. 存储延迟消息
if (msg.getDelayTimeLevel() > 0) {
    topic = ScheduleMessageService.SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // Backup real topic, queueId
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
 }

result = mappedFile.appendMessage(msg, this.appendMessageCallback);
  1. MQ对SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。
    ScheduleMessageService.java类里面有个内部类 DeliverDelayedMessageTimerTask,定时执行check待执行消息的功能,每个类有delayLevel、offset2个变量
  class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;

        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }

        @Override
        public void run() {
                this.executeOnTimeup();
        }
}

批量发送

  • 需要rocketmq的版本要高,4.2以上是支持的,4.1不知道,4.0不支持
  • 提升性能,建议一次消息的大小不超过1M,超大的话,官网有方法Split
  • Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support.
    String topic = "TopicTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
    producer.send(messages);

广播、集群订阅(只需要对接收端做控制即可)

通过在Consumer做配置,实现广播消息,或者集群订阅消息(默认)

consumer.setMessageModel(MessageModel.CLUSTERING); //默认
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350