RocketMQ源码(一):NameServer的启动
RocketMQ源码(二):broker的启动(一)
RocketMQ源码(三):broker的启动(二)
RocketMQ源码(四):producer的启动
RocketMQ源码(六):broker接收消息
RocketMQ源码(七):consumer的启动
RocketMQ源码(八):consumer消息拉取(一)
RocketMQ源码(九):consumer消息拉取(二)
消息发送的整体流程
从上面可以看到一条消息从send方法开始,如何流向commitLog逻辑文件中的,然后通过定时任务持久化到磁盘上。接下来两篇文章会详细的分析这整个过程
还是以RocketMQ源码(四)中的示例代码为例,来分析下消息发送过程。
首先进入的就是DefaultMQProducer的send方法
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// 校验topic名称是否符合要求
Validators.checkTopic(msg.getTopic());
// topic是否不允许发送消息
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
除了消息的校验,就是直接调用defaultMQProducerImpl的send方法
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
这里使用的是CommunicationMode.SYNC,同步发送消息的方式,后续的逻辑中也会主要分析同步消息的发送逻辑
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验 Producer 处于运行状态
this.makeSureStateOK();
// 校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
// 调用编号;用于下面打印日志,标记为同一次发送消息
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取 Topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
// 最后选择消息要发送到的队列
MessageQueue mq = null;
Exception exception = null;
// 最后一次发送结果
SendResult sendResult = null;
// 同步多次调用
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
// 第几次发送
int times = 0;
// 存储每次发送消息选择的broker名
String[] brokersSent = new String[timesTotal];
// 循环调用发送消息,直到成功
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 轮询的选择一个MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 调用发送消息核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 同步发送失败 && 配置存储异常时重新发送开关 时,进行重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
endTimestamp = System.currentTimeMillis();
// 更新失败策略,主要用于规避发生故障的 broker,下文会详细介绍
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下异常continue,进行发送消息重试
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
// 如果有发送结果,进行返回,否则,抛出异常;
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 返回发送结果
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
// 根据不同情况,抛出不同的异常
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
// Namesrv找不到异常
validateNameServerSetting();
// 消息路由找不到异常
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
这个方法很长,拆解下来大概就是下面几个部分
- 消息校验
- 根据topic获取topicPublishInfo,从topicPublishInfo中在重试次数允许的情况下选择一个MessageQueue用于发送消息
- 发送过程中的异常处理
- 发送结束后的结果返回以及异常处理
这里挑4个比较重要的方法继续分析
1.tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从本地缓存获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 当无可用的 Topic信息时,从NameSrv获取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 尝试从NameSrv获取配置信息并更新本地缓存配置
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 若获取的 Topic发布信息时候可用,则返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 获取 TBW102 对应的 Topic信息。
// 这里实际获取到的就是允许自动创建topic的broker
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
这里就是从NameServer获取topic的路由信息。
值得一提的是,在Broker启动时,如果Broker允许自动创建topic,就会向NameSrv创建一个topic=TBW102的路由信息。当Provider向NameSrv查询某个topic信息,查询不到的时候,就会查询topic=TBW102的Broker的信息。
2.selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
/**
* 延迟故障容错,维护每个Broker的发送消息的延迟
*/
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 发送消息延迟容错开关
*/
private boolean sendLatencyFaultEnable = false;
// 最大延迟时间数值,在消息发送之前,先记录当前时间(start),然后消息发送成功或失败时记录当前时间(end),
// (end-start)代表一次消息延迟时间,发送错误时,updateFaultItem 中 isolation 为 true,
// 与 latencyMax 中值进行比较时得值为 30s,也就时该 broke r在接下来得 180000L,也就时3分钟内不提供服务,等待该 Broker 的恢复。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 根据 Topic发布信息 选择一个消息队列
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 是否开启消息失败延迟规避机制,该值在消息发送者那里可以设置。
// 如果该值为false,直接从 topic 的所有队列中选择下一个,而不考虑该消息队列是否可用(比如Broker挂掉)。
// sendLatencyFaultEnable 为 true时,能有效的避免消息发送失败
if (this.sendLatencyFaultEnable) {
try {
// 使用了本地线程变量 ThreadLocal 保存上一次发送的消息队列下标,消息发送使用轮询机制获取下一个发送消息队列。
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 选择 broker 状态正常的队列
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 选择一个消息队列,不考虑队列的可用性
return tpInfo.selectOneMessageQueue();
}
// 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
// 更新延迟容错信息
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
// 计算延迟对应的不可用时间
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
}
这里出现了一个MQFaultStrategy工具类,可以看到的是它主要是通过两个数组,当某个broker发送消息延迟达到多少时,就需要让这个broker暂停使用一段时间,对应的broker的延迟信息时通过 LatencyFaultTolerance 这个类来记录的
public interface LatencyFaultTolerance<T> {
/**
* 更新对应的延迟和不可用时长
*/
void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
/**
* 对象是否可用
*/
boolean isAvailable(final T name);
/**
* 移除对象
*/
void remove(final T name);
/**
* 获取一个对象
*/
T pickOneAtLeast();
}
接下来看看实现类,LatencyFaultToleranceImpl
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
// 对象故障信息Table
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
// 对象选择Index
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
// 创建对象
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
// 更新对象
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else { // 更新对象
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
@Override
public void remove(final String name) {
this.faultItemTable.remove(name);
}
@Override
public String pickOneAtLeast() {
// 创建数组
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
// 打乱 + 排序。
Collections.shuffle(tmpList);
Collections.sort(tmpList);
// 选择顺序在前一半的对象
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}
return null;
}
class FaultItem implements Comparable<FaultItem> {
// 对象名
private final String name;
// 延迟
private volatile long currentLatency;
// 开始可用时间
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
/**
* 比较对象
* 可用性 > 延迟 > 开始可用时间
*/
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable())
return -1;
if (other.isAvailable())
return 1;
}
if (this.currentLatency < other.currentLatency)
return -1;
else if (this.currentLatency > other.currentLatency) {
return 1;
}
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
public boolean isAvailable() {
// 计算出来的延迟值+加上本次消息的延迟值,设置 为FaultItem 的 startTimestamp,
// 表示当前时间必须大于该 startTimestamp 时,该 broker 才重新参与 MessageQueue 的负载
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
}
}
可以看到这里主要是维护了一个faultItemTable,key是brokerName,value是FaultItem对象。
FaultItem类是用来记录对应broker的故障等级,并且根据故障等级设置它的恢复时间。而故障等级的更新主要是当向这个broker发送信息后,根据发送耗时来判断的,由MQFaultStrategy 来调用LatencyFaultTolerance的方法进行更新。
3.sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 获取 broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 是否使用broker vip通道,即在当前调用地址的port上加2。
// broker会开启两个端口对外服务,服务内容几乎一样。VIP端口不支持消息拉取
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
// 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
byte[] prevBody = msg.getBody();
try {
// 设置唯一编号
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
// 根据实例id
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
// 消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
// 事务
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// hook:发送消息校验
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// hook:发送消息前逻辑
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 构建发送消息请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 消息重试Topic,设置当前重试消费次数和最大重试消费次数
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
// 发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// hook:发送消息后逻辑
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
// 返回发送结果
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
// broker为空抛出异常
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
方法很长,总的来说就是做了几个事情
- 获取broker地址信息
- 组装requestHeader,并且执行发送消息前的钩子函数
- 发送消息
- 执行消息发送后的钩子函数,返回发送结果
接下来继续看同步消息的发送逻辑
public SendResult sendMessage(
final String addr,// broker地址
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,// 剩下的超时时间
final CommunicationMode communicationMode,
final SendCallback sendCallback,// 发送消息回调函数
final TopicPublishInfo topicPublishInfo,// topic路由信息
final MQClientInstance instance,// 通信实例
final int retryTimesWhenSendFailed,// 发送失败的重试次数
final SendMessageContext context,// 发送消息的上下文信息
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
// SendMessageRequestHeaderV2 和 V1拥有相同的属性,不过属性名更短,消息体更加简洁
if (isReply) {
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
// 按照发送方式(单向、同步、异步),调用不同的发送函数
switch (communicationMode) {
// 单向:只管发送,不管是否发送成功
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
// 异步:发送后直接返回,在回调函数中等待发送结果
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
// 同步:阻塞至拿到发送结果
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
}
这里就比较简单了,继续看sendMessageSync
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response,addr);
}
这里就是消息的发送和response的结果解析
至此消息发送的第一阶段就结束了,接下来第二阶段就是broker拿到请求后是如何去存储消息的