(六)生产者是如何发送消息的---选择队列(延迟故障),以及重试机制

在前面获取到路由消息后,后面要做的,就是选择合适的队列,通过队列找到合适的broker,将消息发送到broker中。

还是先来看看入口

public class DefaultMQProducerImpl implements MQProducerInner {
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        记录开始时间
        long beginTimestampFirst = System.currentTimeMillis();
        上次的开始时间默认为开始时间
        long beginTimestampPrev = beginTimestampFirst;
        结束时间默认为开始时间
        long endTimestamp = beginTimestampFirst;
        先获取topic对应的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            根据模式,同步还是异步,决定发送失败重试的总次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            循环发送,成功则可以跳出循环
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                选择合适的队列。。。这里是重点
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                队列不为空进行发送
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        记录上次的开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        costTime为已经耗费的总时间,如果已经规定时间内,还没有发送成功,就跳出重试
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        进行发送
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        //endTimestamp - beginTimestampPrev=发送消息所耗费的时间
                        根据发送所耗费的时间决定是不是要将该broker加入到故障列表中
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                异步什么都不做
                                return null;
                            case ONEWAY:
                                oneWay什么都不做
                                return null;
                            case SYNC:
                                同步发送可以根据返回状态决定是否重试
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                发送成功则直接结束方法
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        出现异常则加入故障列表,同时进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        continue;
                    } catch (MQClientException e) {
                        出现异常则加入故障列表,同时进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        出现异常则加入故障列表,同时进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        出现异常则加入故障列表,同时进行重试
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }
            ....省略代码
    }

}

上面的代码很长一大段。
但是核心内容还是,先根据topic获取对应的路由数据,然后从路由数据中找到合适的队列。
本次讲的还是如何找到合适的队列。所以其他内容就先不太关注

                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

上面这段代码是选择队列的核心。

下面还是先看看一些需要用到的类。
LatencyFaultTolerance,延迟故障接口
MQFaultStrategy,延迟故障策略

使用到的类

LatencyFaultTolerance

先来看看结构

public interface LatencyFaultTolerance<T> {
    更新故障项
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
    判断某个故障项,是否已经可用
    boolean isAvailable(final T name);
    移除某个故障项
    void remove(final T name);
    选出故障项
    T pickOneAtLeast();
}

这个接口比较简单,其实只有几个方法。
具体的功能如注释写的,这个接口,其实就是用于管理故障项的

MQFaultStrategy

MQ对于故障的策略

public class MQFaultStrategy {
    延迟故障的具体实现
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    默认是不开启延迟故障
    private boolean sendLatencyFaultEnable = false;
    花费时间的列表,从50毫秒到15000毫秒
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    失效时间列表,从0到600000毫秒,
    举个例子,如果发送到某个Broker用了3000毫秒,那么该broker就会被加入到延迟故障的列表中,失效时间为180000毫秒,即,只有180000毫秒后,这个Broker才可用
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    选择队列
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) ...
    更新故障项
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) 
}

MQFaultStrategy 这个类的具体功能便是对延迟故障策略的增强。
最核心的还是,根据发送消息的时长,决定是否将某个broker设置为某个时间片段内不可用。

接下来直接看看选择队列的方法

MQFaultStrategy是如何选择队列的

public class DefaultMQProducerImpl implements MQProducerInner {
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
}

最后看到还是MQFaultStrategy 中实现的方法。
代码如下

public class MQFaultStrategy {
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        如果开启了延迟故障
        if (this.sendLatencyFaultEnable) {
            try {
                这个index,每次选择一个队列,tpInfo中的ThreadLocalIndex都会加1,意味着,每个线程去获取队列,其实都是负载均衡的。
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                与队列的长度取模,根据最后的pos取一个队列
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    判断取到的队列的broker是否故障中,如果不是故障中,就返回即可
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                如果所有的队列都是故障中的话,那么就从故障列表取出一个Broker即可(待会再看实现)
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                获取这个broker的可写队列数,如果该Broker没有可写的队列,则返回-1
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    再次选择一次队列,通过与队列的长度取模确定队列的位置
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    没有可写的队列,直接从故障列表移除
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            如果故障列表中也没有可写的队列,则直接从tpInfo中获取一个
            return tpInfo.selectOneMessageQueue();
        }
        没有开启延迟故障,直接从TopicPublishInfo通过取模的方式获取队列即可,如果LastBrokerName不为空,则需要过滤掉brokerName=lastBrokerName的队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

}

从上面的代码逻辑来看分为俩部分。
A.开启延迟故障
1.还是从TopicPushlishInfo中,通过取模来遍历队列。
2.根据第一步获取到的队列,通过判断是否在故障表中,决定是否要使用该队列
3.如果在第2步中,找到了可以使用的队列(不在故障列表,或者已经过了故障时间),则直接返回,否则进行第四步
4.说明没有找到非故障的队列,这个时候只能从故障列表中获取一个队列,若果该队列对应的broker对应的可写队列数大于0,则直接返回,若没有可写的队列数,将该broker从故障列表中移除。如果故障列表也找不到可以用的队列,则进行第5步
5.直接通过取模的方式,从TopicPushlishInfo获取队列
B.没有开启延迟故障
1.没有开启延迟故障,直接从TopicPublishInfo通过取模的方式获取队列即可,如果LastBrokerName不为空,则需要过滤掉brokerName=lastBrokerName的队列

通过上面就知道延迟故障的开启模式是如何选择队列的。

那么MQFaultStrategy是如何判断一个BrokerName是否处于故障中?

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }
    故障项
    class FaultItem implements Comparable<FaultItem> {
        private final String name;故障的brokerName
        private volatile long currentLatency;其实这里应该是消息发送到该broker所耗费的时间
        private volatile long startTimestamp;这里则是broker的失效时间戳(在这时间之前,该broker都是不可用的)
            public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
    }
}

那么MQFaultStrategy是如何将Broker加入到故障项中?

public class MQFaultStrategy {
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        开启延迟故障
        if (this.sendLatencyFaultEnable) {
            计算出故障时间,隔离默认是30秒,不隔离则根据性能来选择故障时间
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
    

    private long computeNotAvailableDuration(final long currentLatency) {
        其实就是根据性能来选择相应的故障时间,性能越差意味着currentLatency越大,那么对应的notAvailableDuration (故障时长)也就越大。
        因为某种意义上来说,肯定是需要规避性能差的broker的使用次数。
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
}

从上面代码看,是根据实际耗费的时间,来决定延时的时长,当currentLatency<550毫秒的时候,是不会被列为故障项的。

大概的流程图如下。


发送消息的重试机制

延迟故障的优点:

因为队列数据默认是按borkerName进行排序的,即如果某个Broker挂了,那么在队列选择的时候,可能连续好几个队列的选择都是失效的,最终导致连续发送几次消息还是失败,因为其实连续几次都是发到一个broker中。导致超时发送失效,另外也浪费资源。
倘若开启了延迟故障,对于发送时长比较长或者已经故障(因为网络通信或者各种原因宕机)的broker,可以采取让他们暂时故障的方式,可以有效的规避开性能较差,或者是已经故障的broker,直接找到最优的Broker进行发送。
Broker如果宕机的话,生产者主动去从注册中心去拉去对应的路由数据也需要30秒的时候,每30秒进行一次路由数据同步,倘若broker宕机,但是生产者还没有去主动的获取路由数据,延迟故障就可以发挥相应的作用了,起码可以避免无用的重试,另外在后面选择的时候,也可以优先选出可用的broker来,进一步节省资源。

重试机制

异步与OneWay只会尝试一次
而同步会重试3次

超时机制

重复发送的时候,会对比时间是否已经用完,如果用完就会抛异常。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容