RocketMQ是通过MQFaultStrategy的selectOneMessageQueue方法来选择发送队列的
MQFaultStrategy
我们先来看下MQFaultStrategy中重要的属性
//延迟容错对象,维护延迟Brokers的信息
//key:brokerName
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//延迟容错开关
private boolean sendLatencyFaultEnable = false;
//延迟级别数组
private long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };
//不可用时长数组
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
MQFaultStrategy中最重要的属性是latencyFaultTolerance,它维护了那些消息发送延迟较高的brokers的信息,同时延迟的时间长短对应了延迟级别latencyMax 和时长notAvailableDuration ,sendLatencyFaultEnable 控制了是否开启发送消息延迟功能。
来看主方法
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判断是否开启了开关
if (this.sendLatencyFaultEnable) {
try {
//获取一个可用的并且brokerName=lastBrokerName的消息队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//选择一个相对好的broker,不考虑可用性的消息队列
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//随机选择一个消息队列
return tpInfo.selectOneMessageQueue();
}
//获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
我们来看开启了延迟容错的逻辑:
1.首先选择一个broker==lastBrokerName并且可用的一个队列(也就是该队列并没有因为延迟过长而被加进了延迟容错对象latencyFaultTolerance 中)
2.如果第一步中没有找到合适的队列,此时舍弃broker==lastBrokerName这个条件,选择一个相对较好的broker来发送
3.随机选择一个队列来发送
LatencyFaultToleranceImpl
selectOneMessageQueue选择队列的基本逻辑我们已经了解了,现在来具体看下LatencyFaultToleranceImpl是怎么来维护这些broker的可用性和延迟的呢?
主要属性faultItemTable 和内部类FaultItem
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
}
顾名思义这是一个延迟对象List,key为broker,value为FaultItem,FaultItem中存储了该broker的name,延迟界别和延迟开始的时间。
判断队列可用性方法如下:
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
如果faultItem 中不存在该broker,返回true,当存在时,还需判断isAvailable
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
如果延迟时间已过也返回true。
updateFaultItem
选择完队列后,执行发送步骤
//发送start时间
beginTimestampPrev = System.currentTimeMillis();
//发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
//发送结束时间
endTimestamp = System.currentTimeMillis();
//更新broker的延迟情况
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
我们可以看到这里计算了某个broker的发送时间,然后根据这个时间去更新FaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
这里根据延迟时间对比MQFaultStrategy中的延迟级别数组latencyMax 不可用时长数组notAvailableDuration 来将该broker加进faultItemTable中。
总结
1.所有的broker延迟信息都会被记录
2.发送消息时会选择延迟最低的broker来发送,提高效率
3.broker延迟过高会自动减少它的消息分配,充分发挥所有服务器的能力