消息发送流程主要的步骤:验证消息、查找路由 、消息发送 (包含异常处理机制) 。
(1)消息长度验证
消息发送之前,首先确保生 产 者处于运行状态,然后验证消息是否符合相应的规范, 具体的规范要求是主题名称 、 消息体不能为空 、 消息长度不能等于 0且默认不能超过允许 发送消息的最大长度 4M (maxMessageSize=l024 *1024 *4)。
(2)查找主题路由信息
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息
要发送到具体的 Broker节点。
在发送消息时会根据消息的Topic名称从Name Server中根据Topic名称来获取Broker相关的地址信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
(3)由于一个Broker中对于每个Topic会有多个MessageQueue,生产者默认选择MessageQueue策略为轮询MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
(4)选择了MessageQueue后,会根据MessageQueue中Broker信息调用sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);发送消息。
private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
long invokeID = this.random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
while(true) {
String info;
if (times < timesTotal) {
info = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mqSelected.getBrokerName();
long endTimestamp;
try {
beginTimestampPrev = System.currentTimeMillis();
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch(communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() == SendStatus.SEND_OK || !this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
return sendResult;
}
}
} catch (RemotingException var24) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var24);
this.log.warn(msg.toString());
exception = var24;
} catch (MQClientException var25) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var25);
this.log.warn(msg.toString());
exception = var25;
} catch (MQBrokerException var26) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, true);
this.log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var26);
this.log.warn(msg.toString());
exception = var26;
switch(var26.getResponseCode()) {
case 1:
case 14:
case 16:
case 17:
case 204:
case 205:
break;
default:
if (sendResult != null) {
return sendResult;
}
throw var26;
}
} catch (InterruptedException var27) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mqSelected.getBrokerName(), endTimestamp - beginTimestampPrev, false);
this.log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mqSelected), var27);
this.log.warn(msg.toString());
this.log.warn("sendKernelImpl exception", var27);
this.log.warn(msg.toString());
throw var27;
}
++times;
continue;
}
}
if (sendResult != null) {
return sendResult;
}
info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException)exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(10001);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(10002);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(10003);
}
throw mqClientException;
}
} else {
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null != nsList && !nsList.isEmpty()) {
throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
} else {
throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
}
}
}