在前面获取到路由消息后,后面要做的,就是选择合适的队列,通过队列找到合适的broker,将消息发送到broker中。
还是先来看看入口
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
记录开始时间
long beginTimestampFirst = System.currentTimeMillis();
上次的开始时间默认为开始时间
long beginTimestampPrev = beginTimestampFirst;
结束时间默认为开始时间
long endTimestamp = beginTimestampFirst;
先获取topic对应的路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
根据模式,同步还是异步,决定发送失败重试的总次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
循环发送,成功则可以跳出循环
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
选择合适的队列。。。这里是重点
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
队列不为空进行发送
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
记录上次的开始时间
beginTimestampPrev = System.currentTimeMillis();
costTime为已经耗费的总时间,如果已经规定时间内,还没有发送成功,就跳出重试
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
进行发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//endTimestamp - beginTimestampPrev=发送消息所耗费的时间
根据发送所耗费的时间决定是不是要将该broker加入到故障列表中
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
异步什么都不做
return null;
case ONEWAY:
oneWay什么都不做
return null;
case SYNC:
同步发送可以根据返回状态决定是否重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
发送成功则直接结束方法
return sendResult;
default:
break;
}
} catch (RemotingException e) {
出现异常则加入故障列表,同时进行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
continue;
} catch (MQClientException e) {
出现异常则加入故障列表,同时进行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQBrokerException e) {
出现异常则加入故障列表,同时进行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
出现异常则加入故障列表,同时进行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
....省略代码
}
}
上面的代码很长一大段。
但是核心内容还是,先根据topic获取对应的路由数据,然后从路由数据中找到合适的队列。
本次讲的还是如何找到合适的队列。所以其他内容就先不太关注
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
上面这段代码是选择队列的核心。
下面还是先看看一些需要用到的类。
LatencyFaultTolerance,延迟故障接口
MQFaultStrategy,延迟故障策略
使用到的类
LatencyFaultTolerance
先来看看结构
public interface LatencyFaultTolerance<T> {
更新故障项
void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
判断某个故障项,是否已经可用
boolean isAvailable(final T name);
移除某个故障项
void remove(final T name);
选出故障项
T pickOneAtLeast();
}
这个接口比较简单,其实只有几个方法。
具体的功能如注释写的,这个接口,其实就是用于管理故障项的
MQFaultStrategy
MQ对于故障的策略
public class MQFaultStrategy {
延迟故障的具体实现
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
默认是不开启延迟故障
private boolean sendLatencyFaultEnable = false;
花费时间的列表,从50毫秒到15000毫秒
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
失效时间列表,从0到600000毫秒,
举个例子,如果发送到某个Broker用了3000毫秒,那么该broker就会被加入到延迟故障的列表中,失效时间为180000毫秒,即,只有180000毫秒后,这个Broker才可用
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
选择队列
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) ...
更新故障项
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation)
}
MQFaultStrategy 这个类的具体功能便是对延迟故障策略的增强。
最核心的还是,根据发送消息的时长,决定是否将某个broker设置为某个时间片段内不可用。
接下来直接看看选择队列的方法
MQFaultStrategy是如何选择队列的
public class DefaultMQProducerImpl implements MQProducerInner {
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
}
最后看到还是MQFaultStrategy 中实现的方法。
代码如下
public class MQFaultStrategy {
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
如果开启了延迟故障
if (this.sendLatencyFaultEnable) {
try {
这个index,每次选择一个队列,tpInfo中的ThreadLocalIndex都会加1,意味着,每个线程去获取队列,其实都是负载均衡的。
int index = tpInfo.getSendWhichQueue().getAndIncrement();
与队列的长度取模,根据最后的pos取一个队列
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
判断取到的队列的broker是否故障中,如果不是故障中,就返回即可
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
如果所有的队列都是故障中的话,那么就从故障列表取出一个Broker即可(待会再看实现)
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
获取这个broker的可写队列数,如果该Broker没有可写的队列,则返回-1
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
再次选择一次队列,通过与队列的长度取模确定队列的位置
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
没有可写的队列,直接从故障列表移除
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
如果故障列表中也没有可写的队列,则直接从tpInfo中获取一个
return tpInfo.selectOneMessageQueue();
}
没有开启延迟故障,直接从TopicPublishInfo通过取模的方式获取队列即可,如果LastBrokerName不为空,则需要过滤掉brokerName=lastBrokerName的队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
从上面的代码逻辑来看分为俩部分。
A.开启延迟故障
1.还是从TopicPushlishInfo中,通过取模来遍历队列。
2.根据第一步获取到的队列,通过判断是否在故障表中,决定是否要使用该队列
3.如果在第2步中,找到了可以使用的队列(不在故障列表,或者已经过了故障时间),则直接返回,否则进行第四步
4.说明没有找到非故障的队列,这个时候只能从故障列表中获取一个队列,若果该队列对应的broker对应的可写队列数大于0,则直接返回,若没有可写的队列数,将该broker从故障列表中移除。如果故障列表也找不到可以用的队列,则进行第5步
5.直接通过取模的方式,从TopicPushlishInfo获取队列
B.没有开启延迟故障
1.没有开启延迟故障,直接从TopicPublishInfo通过取模的方式获取队列即可,如果LastBrokerName不为空,则需要过滤掉brokerName=lastBrokerName的队列
通过上面就知道延迟故障的开启模式是如何选择队列的。
那么MQFaultStrategy是如何判断一个BrokerName是否处于故障中?
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
故障项
class FaultItem implements Comparable<FaultItem> {
private final String name;故障的brokerName
private volatile long currentLatency;其实这里应该是消息发送到该broker所耗费的时间
private volatile long startTimestamp;这里则是broker的失效时间戳(在这时间之前,该broker都是不可用的)
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
}
}
那么MQFaultStrategy是如何将Broker加入到故障项中?
public class MQFaultStrategy {
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
开启延迟故障
if (this.sendLatencyFaultEnable) {
计算出故障时间,隔离默认是30秒,不隔离则根据性能来选择故障时间
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
其实就是根据性能来选择相应的故障时间,性能越差意味着currentLatency越大,那么对应的notAvailableDuration (故障时长)也就越大。
因为某种意义上来说,肯定是需要规避性能差的broker的使用次数。
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
}
从上面代码看,是根据实际耗费的时间,来决定延时的时长,当currentLatency<550毫秒的时候,是不会被列为故障项的。
大概的流程图如下。
延迟故障的优点:
因为队列数据默认是按borkerName进行排序的,即如果某个Broker挂了,那么在队列选择的时候,可能连续好几个队列的选择都是失效的,最终导致连续发送几次消息还是失败,因为其实连续几次都是发到一个broker中。导致超时发送失效,另外也浪费资源。
倘若开启了延迟故障,对于发送时长比较长或者已经故障(因为网络通信或者各种原因宕机)的broker,可以采取让他们暂时故障的方式,可以有效的规避开性能较差,或者是已经故障的broker,直接找到最优的Broker进行发送。
Broker如果宕机的话,生产者主动去从注册中心去拉去对应的路由数据也需要30秒的时候,每30秒进行一次路由数据同步,倘若broker宕机,但是生产者还没有去主动的获取路由数据,延迟故障就可以发挥相应的作用了,起码可以避免无用的重试,另外在后面选择的时候,也可以优先选出可用的broker来,进一步节省资源。
重试机制
异步与OneWay只会尝试一次
而同步会重试3次
超时机制
重复发送的时候,会对比时间是否已经用完,如果用完就会抛异常。