RocketMQ消息发送 深入剖析

消息发送流程主要的步骤:验证消息、查找路由 、消息发送 (包含异常处理机制) 。

(1)消息长度验证

消息发送之前,首先确保生 产 者处于运行状态,然后验证消息是否符合相应的规范, 具体的规范要求是主题名称 、 消息体不能为空 、 消息长度不能等于 0且默认不能超过允许 发送消息的最大长度 4M (maxMessageSize=l024 *1024 *4)。

(2)查找主题路由信息

消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息
要发送到具体的 Broker节点。

在发送消息时会根据消息的Topic名称从Name Server中根据Topic名称来获取Broker相关的地址信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

(3)由于一个Broker中对于每个Topic会有多个MessageQueue,生产者默认选择MessageQueue策略为轮询MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);

(4)选择了MessageQueue后,会根据MessageQueue中Broker信息调用sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);发送消息。

   private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        long invokeID = this.random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
 
            while(true) {
                String info;
                if (times < timesTotal) {
                    info = null == mq ? null : mq.getBrokerName();
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mqSelected.getBrokerName();
 
                        long endTimestamp;
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch(communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    return sendResult;
                                }
                            }
                        } catch (RemotingException var24) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
                            this.log.warn(msg.toString());
                            exception = var24;
                        } catch (MQClientException var25) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
                            this.log.warn(msg.toString());
                            exception = var25;
                        } catch (MQBrokerException var26) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
                            this.log.warn(msg.toString());
                            exception = var26;
                            switch(var26.getResponseCode()) {
                            case 1:
                            case 14:
                            case 16:
                            case 17:
                            case 204:
                            case 205:
                                break;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }
 
                                throw var26;
                            }
                        } catch (InterruptedException var27) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
                            this.log.warn(msg.toString());
                            this.log.warn("sendKernelImpl exception", var27);
                            this.log.warn(msg.toString());
                            throw var27;
                        }
 
                        ++times;
                        continue;
                    }
                }
 
                if (sendResult != null) {
                    return sendResult;
                }
 
                info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
                info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(10001);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(10002);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(10003);
                }
 
                throw mqClientException;
            }
        } else {
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if (null != nsList && !nsList.isEmpty()) {
                throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
            } else {
                throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
            }
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容