RocketMQ系列3-消息发送流程

概念

1、消息发送方式

Rocketmq提供三种方式可以发送普通消息:同步、异步、和单向发送。

  • 同步:发送方发送消息后,收到服务端响应后才发送下一条消息
  • 异步:发送一条消息后,不等服务端返回就可以继续发送消息或者后续任务处理。发送方通过回调接口接收服务端响应,并处理响应结果。
  • OneWay:发送方发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不需要应答。

发送方式对比:发送吞吐量,单向>异步>同步。但单向发送可靠性差存在丢失消息可能,选型根据实际需求确定。

2、消息类型

消息客户端提供多种SDK: 普通、顺序、事务、延时消息

  • 普通消息:MQ生产者客户端对象是线程安全的,可以在多线程之间共享使用。同时也可以用多线程并发发送消息可以增加消息TPS,一般项目中创建一个Peoducer实例就好。
 DefaultMQProducer producer = new DefaultMQProducer("arch-rocketmq");
 producer.setNamesrvAddr("localhost:9876");
 producer.start();   
 try {
     Message msg = new Message("mq-test4","*", ("Hello RocketMQ").getBytes("UTF-8"));
     SendResult sendResult = producer.send(msg);
     System.out.printf("%s%n", sendResult);
 }  
  • 顺序消息:提供一种严格按照顺序来发送和消费的消息类型。指定一个topic后,所有消息Message根据sharding key进行分区,相同key的消息在同一个分区内完全按照FIFO进行发送和消费。
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
for (int i = 0; i < 100; i++) {
    int orderId = i % 10;
    Message msg = new Message("Topic-test", "*", "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
           Integer id = (Integer) arg;
          int index = id % mqs.size();
          return mqs.get(index);
       }
    }, orderId);
    System.out.printf("%s%n", sendResult);
}
  • 延时消息:用于指定消息发送到消息队列后,延时一段时间才会被客户端进行消费,使用于任务延时场景。开源项目支持18个延迟级别 delayTimeLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
 DefaultMQProducer producer = new DefaultMQProducer("arch-rocketmq");
 producer.setNamesrvAddr("localhost:9876");
 producer.start();   
 try {
     Message msg = new Message("mq-test4","*", ("Hello RocketMQ").getBytes("UTF-8"));
     msg.setDelayTimeLevel(3);  // 延迟10s
     SendResult sendResult = producer.send(msg);
     System.out.printf("%s%n", sendResult);
 } catch (Exception e) { }
  • 事务消息:RocketMQ提供类似XA分布式事务能力(XA则是一种分布式事务协议,包含事务管理器和本地资源两部分),主要解决了消息发送和数据库事务带来不一致问题。整个事务消息交互如下图
image.jpeg
  1. 应用程序在事务内完成落库,同步调用RocketMQ消息发送,发送状态为Prepare并设置事件监听。
  2. RocketMQ侧收到消息后,将消息存储在RMQ_SYS_Trans_Half_topic消息消费队列,这样消费不会被立即消费到。
  3. RocketMQ则开启定时任务,消息RMQ_SYS_Trans_Half_topic向发送端发起消息事务状态回查,应用程序根据事务状态回馈服务器(提交、回滚)如果提交或回滚,消息服务器则提交或回滚消息。RocketMQ允许设置消息回查间隔和回查次数。超过则回滚消息。
 // TransactionListenerImpl需要实现TransactionListener 
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("my_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            public Thread newThread(Runnable r) { new Thread(r);}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
     try {
           Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
          SendResult sendResult = producer.sendMessageInTransaction(msg, null);
          System.out.printf("%s%n", sendResult);
  } catch (MQClientException | UnsupportedEncodingException e) {
          e.printStackTrace();
  }
}

3、消息数据结构

Message配置

字段名 默认值 说明
topic 必填 线下环境不需要申请,线上环境需要申请工单才能使用
body 必填 进行序列化转化为二进制
tags 为每个消息设置tag可做消息过滤
keys 代表这条消息的业务关键词,尽可能保证Key唯一
DelayTimeLevel 消息延时级别,开源RocketMQ支持18个级别的延迟1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h v

客户端公共配置

字段名 默认值 说明
nameServer Name Server 地址列表,多个 NameServer 地址用分号 隔开
clientIP 本机IP 客户端本机 IP 地址,某些机器会发生无法识别客户端 IP 地址情况,需要应用在代码中强制指定
instanceName false 客户端实例名称
clientCallbackExecutorThreads 131072 客户端限制的消息大小,超过报错,同时 服务端也会限制
pollNameServerInteval 4 发送消息时,自动创建服务器不存在的topic默认创建队列数
heartbeatBrokerInterval 30000 向 Broker 发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化 Consumer 消费进度间隔时间,单位毫秒

Producer配置

字段名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer 组名,多个 Producer 如果属于一 个应用,发送同样的消息,则应该将它们 归为同一组
sendMsgTimeout 10000 发送消息超时时间,单位毫秒
retryAnotherBrokerWhenNotStoreOK false 如果发送消息返回 sendResult,但是 sendStatus!=SEND_OK,是否重试发送
maxMessageSize 131072 客户端限制的消息大小,超过报错,同时 服务端也会限制
defaultTopicQueueNums 4 发送消息时,自动创建服务器不存在的topic默认创建队列数

消息发送原理解刨

1、 发送主要包括三个核心流程:

  • 获取topic路由信息TopicPublishInfo(路由信息在启动时就已经加载过,在系列一种已经讲解过)
  • 根据topic的路由信息选择一个MessageQueue(明确往哪个broker发送)
  • 发送消息,成功则返回,失败则更新规避策略,同时进行重试发送默认重试次数是3次通过times进行计数。如果发生消息重试会导致消息发送重复,例如发送消息实际上Broker接收到消息,但客户端接受结果超时会重发消息,所有消费端需要保证幂等性。
private SendResult sendDefaultImpl( Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout){
       TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
       if (topicPublishInfo != null && topicPublishInfo.ok()) {
           int times = 0;
           for (; times < timesTotal; times++) {
               String lastBrokerName = null == mq ? null : mq.getBrokerName();
               MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               if (mqSelected != null) {
                   mq = mqSelected;
                   brokersSent[times] = mq.getBrokerName();
                   try {
                       beginTimestampPrev = System.currentTimeMillis();
                       if (times > 0) {msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}
                       long costTime = beginTimestampPrev - beginTimestampFirst;
                       if (timeout < costTime) {callTimeout = true;break;}
                       sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                       switch (communicationMode) {
                           case ASYNC: return null;
                           case ONEWAY: return null;
                           case SYNC:
                               if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                   if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                       continue;}}
                               return sendResult;
                           default: break;
                       }
                   } catch (Exception e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                       
                       exception = e;
                       continue;
                   }  
           if (sendResult != null) {
               return sendResult;
           }
}

2、 获取topic路由信息

2.1. 一个topic分布在多个Broker,一个Broker包含多个Queue(brokerName、读队列个数、写队列个数、权限、同步或异步);从缓存中获取topic路由信息;没有则从namesrv获取;没有使用默认topic获取路由配置信息。

2.2. 从NameServer获取配置信息,使用ReentrantLock,设置超时3s;基于Netty从namesrv获取配置信息;然后更新topic本地缓存,需要同步更新发送者和消费者的topic缓存

ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

3、 根据topic负载均衡算法选择一个MessageQueue

1. 是否开启消息失败延迟规避机制

2. 本地变量ThreadLocal 保存上一次发送的消息队列下标,消息发送使用轮询机制获取下一个发送消息队列。同时topic发送有异常延迟,确保选中的消息队列所在broker正常

3. 当前消息队列是否可用

发送消息延迟机制;MQFaultStrategy(latencyMax最大延迟时间 end-start为消息延迟时间,如果失败 则将这个broker的isolation为true,同时这个broker在5分钟内不提供服务等待回复)

if (this.sendLatencyFaultEnable) {
    try {
       int index = tpInfo.getSendWhichQueue().getAndIncrement();
       for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
           int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
           if (pos < 0)  pos = 0;
           MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
           if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {   // 判断broker是否被规避
               if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                   return mq;
           }
       }
       final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();  //选择
       int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
       if (writeQueueNums > 0) {
       final MessageQueue mq = tpInfo.selectOneMessageQueue();
       if (notBestBroker != null) {
           mq.setBrokerName(notBestBroker);
           mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
       }
       return mq;  
   }          
}
return tpInfo.selectOneMessageQueue(lastBrokerName);

4、故障延迟机制FaultItem

开启故障延迟则会构造FaultItem记录,在某一时刻前都当做故障(brokeName、发送消息异常时间点、这个时间点都为故障)

4.1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)

4.2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送

4.3. 选择一个队列来发送,一般都是取模方式来获取

也就是当Producer发送消息时间过长,逻辑上N秒内Broker不可用。例如发送时间超过15000ms,broker则在60000ms内不可用

private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
class FaultItem implements Comparable<FaultItem> {
    private final String name;
    private volatile long currentLatency;
    private volatile long startTimestamp;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
}

5、发送消息

该方法是消息发送核心方法,已经明确往哪个Broker发送消息了,里面设置到消息校验、消息发送前和发送后做的事情(消息轨迹就是在这里处理的后续文章会分析)、构建请求消息体最终调用remotingClient.invoke()并完成netty的网络请求(具体就是创建channel并将数据写入writeAndFlush,通过ChannelFutureListener进行监听返回结果)。

private SendResult sendKernelImpl(Message msg, MessageQueue mq, CommunicationMode communicationMode, SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 是否使用broker vip通道。broker会开启两个端口对外服务。
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
            byte[] prevBody = msg.getBody();
            SendResult var38;
            try {
                if (!(msg instanceof MessageBatch)) {  //批量消息
                    MessageClientIDSetter.setUniqID(msg);
                }
                if (this.hasCheckForbiddenHook()) {  // 发送消息校验
                    // ....
                }
                if (this.hasSendMessageHook()) {
                     //... 构造发送消息前 上下文
                    this.executeSendMessageHookBefore(context);
                }
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                // 发送前请求参数构造
                if (requestHeader.getTopic().startsWith("%RETRY%")) {
                     // 判断是否是消息重试
                }
                SendResult sendResult = null;
                switch(communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
                    break;
                case ONEWAY:
                case SYNC:                
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this);
                    break;
                default: assert false;
                }
                // 发送消息后执行
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }
                var38 = sendResult;
            } catch (RemotingException var30) {
                if (this.hasSendMessageHook()) {
                    context.setException(var30);
                    this.executeSendMessageHookAfter(context);
                }
                throw var30;
            }  finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
            return var38;
        }  
    }

常见问题

NameServer挂了

如果Namesrv挂了,当新加入的生产消费则获取不到topic路由信息会报MQExecption;如果生产消费缓存了生产者有缓存 Topic 的路由信息,如果NameServer 全部挂掉,并且,此时依然可以发送消息。

Broker挂机

消息生产者每隔30s从nameser获取存活的broker,broker每隔30s向nameser发送存活情况。

如果Broker挂了分两种情况:

  • sendLatencyFaultEnable:使用了故障延迟机制,通过获取一个MessageQueue发送失败,Broker 进行标记,标记该 Broker 在未来的某段时间内不会被选择到,默认为(5分钟,不可改变)

  • 不启用sendLatencyFaultEnable : procuder 每次发送消息,会采取轮询机制取下一个 MessageQueue,由于可能该 Message 所在的Broker挂掉,会抛出异常。因为一个 Broker 默认为一个 topic 分配4个 messageQueue,由于默认只重试2次

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容