rocketmq源码8-客户端-MQClientInstance

一 MQClientManager

  • 单例模式
private static MQClientManager instance = new MQClientManager();
public static MQClientManager getInstance() {
    return instance;
}
  • 根据id,创建并缓存对应的MQClientInstance
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

二 MQClientInstance属性及实例化

  • 使用当前MQClientInstance的生产者,消费者,管理者的纪录信息
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
  • topic的路由信息
ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;//顺序消费的配置信息
    private List<QueueData> queueDatas;//topic在broker上的队列配置信息
    private List<BrokerData> brokerDatas;//topic所在broker的信息
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//过滤server信息
}

public class QueueData implements Comparable<QueueData> {
    private String brokerName; //topc所在broker
    private int readQueueNums;//读队列数
    private int writeQueueNums;//写队列数
    private int perm;//读写配置
    private int topicSynFlag;//同步配置
}

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;//broker的集群名称
    private String brokerName;//broker名称
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;//broker的主从节点地址
}
  • 全量broker信息
//broker主从节点地址表
ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable
//broker版本表
ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable
  • 实例化
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
//配置
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
//通信客户端配置
    this.nettyClientConfig = new NettyClientConfig();
 this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
//请求码处理函数
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
//组装请求,代理通信客户端的通信接口
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    if (this.clientConfig.getNamesrvAddr() != null) {
//更新namesrv地址列表        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }
    this.clientId = clientId;
//topic,队列,消息管理接口
    this.mQAdminImpl = new MQAdminImpl(this);
//push方式中,异步线程处理拉消息请求。
    this.pullMessageService = new PullMessageService(this);
//定时任务,调用消费端负载均衡服务。
    this.rebalanceService = new RebalanceService(this);
//内部生产者topic,用于消费失败或超时的消息,sendMessageBack回发给broker,放大retry topic中重试消费
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
//统计服务
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

三 MQClientInstance初始化

  • start()
public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

四 RebalanceService

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
//默认20s,使用CountDownLatch2休眠等待20s
        this.waitForRunning(waitInterval);
//调用
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}
//遍历所有消费客户端,执行DefaultMQPushConsumerImpl.doRebalance()。具体见前述章节
public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

五 PullMessageService

  • push方式消费端使用
  • 存储拉消息的请求,异步处理请求拉取消息
    LinkedBlockingQueue<PullRequest> pullRequestQueue
  • pullRequestQueue生产者1,消费端负载均衡更新完成后,发送PullRequest到pullRequestQueue队列中
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
//遍历分配给topic的MessageQueue
for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
//没有消费缓存快照的,则需要组装PullRequest,拉取消息放入缓存快照中
        if (isOrder && !this.lock(mq)) {
//顺序topic需要先加锁
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }

        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();
//计算首次消费的偏移量
        long nextOffset = this.computePullFromWhere(mq);
        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {//防止并发
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                PullRequest pullRequest = new PullRequest();
                //消费组
                pullRequest.setConsumerGroup(consumerGroup);
                //消费偏移量
                pullRequest.setNextOffset(nextOffset);
                //消息队列
                pullRequest.setMessageQueue(mq);
                //消息缓存快照,拉取消息后本地缓存,等待业务方消费
                pullRequest.setProcessQueue(pq);
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}

this.dispatchPullRequest(pullRequestList);
...
}

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
//分发消息到阻塞队列中        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
    }
}
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
//等待获取拉消息请求PullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                this.pullMessage(pullRequest);
            }
        } catch (InterruptedException e) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}
//获取消费组对应的消费客户端,执行pullMessage拉取消息。
private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

六 MQAdminImpl

  • 创建topic
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
    try {
        //从namesrv获取topic路由信息
        TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
        //获取broker列表
        List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
        if (brokerDataList != null && !brokerDataList.isEmpty()) {
            Collections.sort(brokerDataList);
            //至少一个broker创建成功
            boolean createOKAtLeastOnce = false;
            MQClientException exception = null;
            StringBuilder orderTopicString = new StringBuilder();
            //遍历broker
            for (BrokerData brokerData : brokerDataList) {
                //取broker主节点地址
                String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (addr != null) {//初始化topic的配置
                    TopicConfig topicConfig = new TopicConfig(newTopic);
                    topicConfig.setReadQueueNums(queueNum);//读队列数
                    topicConfig.setWriteQueueNums(queueNum);//写队列数
                    topicConfig.setTopicSysFlag(topicSysFlag);//单元标记

                    boolean createOK = false;
                    for (int i = 0; i < 5; i++) {
               //通知broker创建topic。失败最多尝试5次
                        try {                                        
                            this.mQClientFactory.getMQClientAPIImpl()
                                .createTopic(addr, key, topicConfig, timeoutMillis);
                            createOK = true;
                            createOKAtLeastOnce = true;
                            break;
                        } catch (Exception e) {
                            if (4 == i) {
                                exception = new MQClientException("create topic to broker exception", e);
                            }
                        }
                    }

                    if (createOK) {
                        orderTopicString.append(brokerData.getBrokerName());
                        orderTopicString.append(":");
                        orderTopicString.append(queueNum);
                        orderTopicString.append(";");
                    }
                }
            }
            //有一个broker创建topic成功,即认为成功
            if (exception != null && !createOKAtLeastOnce) {
                throw exception;
            }
        } else {
            throw new MQClientException("Not found broker, maybe key is wrong", null);
        }
    } catch (Exception e) {
        throw new MQClientException("create new topic failed", e);
    }
}
  • 查询生产路由
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
    try {
        TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
        if (topicRouteData != null) {
//若顺序order配置,则route.getOrderTopicConf().split(";");获取broker列表,存储broker的mq
//若非顺序order配置,则遍历route.getQueueDatas(),选择可写的broker,保存broker主节点上的mq
            TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                return topicPublishInfo.getMessageQueueList();
            }
        }
    } catch (Exception e) {
        throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
    }

    throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
  • 查询消费路由
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
    try {
        TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
        if (topicRouteData != null) {
//遍历route.getQueueDatas(),选择可读的broker,保存mq信息
            Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
            if (!mqList.isEmpty()) {
                return mqList;
            } else {
                throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
            }
        }
    } catch (Exception e) {
        throw new MQClientException(
            "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
            e);
    }

    throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
  • 查询偏移量
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
//获取队列所属broker主节点地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
 this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    if (brokerAddr != null) {
        try { //发送请求获取指定时间戳的偏移量数据     
            return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
                timeoutMillis);
        } catch (Exception e) {
            throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
  • 查询消息详情
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end,
    boolean isUniqKey) throws MQClientException,
    InterruptedException {
 //获取topic路由信息
    TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
    if (null == topicRouteData) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
    }

    if (topicRouteData != null) {
//遍历topic所在broker列表,获取broker地址
        List<String> brokerAddrs = new LinkedList<String>();
        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                brokerAddrs.add(addr);
            }
        }

        if (!brokerAddrs.isEmpty()) {
//等待每个broker查询结果返回
            final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
//存储查询结果
            final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
//控制对queryResultList的并发修改
            final ReadWriteLock lock = new ReentrantReadWriteLock(false);
//遍历broker地址,发送异步查询消息,查询topic指定key在时间区间内的消息
            for (String addr : brokerAddrs) {
                try {
                    QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
                    requestHeader.setTopic(topic);
                    requestHeader.setKey(key);
                    requestHeader.setMaxNum(maxNum);
                    requestHeader.setBeginTimestamp(begin);
                    requestHeader.setEndTimestamp(end);

                    this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
                        new InvokeCallback() {//异步查询响应回调函数
                            @Override
                            public void operationComplete(ResponseFuture responseFuture) {
                                try {
                                    RemotingCommand response = responseFuture.getResponseCommand();
                                    if (response != null) {
                                        switch (response.getCode()) {
                                            case ResponseCode.SUCCESS: {
                                                QueryMessageResponseHeader responseHeader = null;
                                                try {
                                                    responseHeader =
                                                        (QueryMessageResponseHeader) response
                                                            .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
                                                } catch (RemotingCommandException e) {
                                                    log.error("decodeCommandCustomHeader exception", e);
                                                    return;
                                                }
//解码查询到的消息列表
                                                List<MessageExt> wrappers =
                                                    MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);

                                                QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
                                                try {//加锁保存
                                                    lock.writeLock().lock();
                                                    queryResultList.add(qr);
                                                } finally {
                                                    lock.writeLock().unlock();
                                                }
                                                break;
                                            }
                                            default:
                                                log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
                                                break;
                                        }
                                    } else {
                                        log.warn("getResponseCommand return null");
                                    }
                                } finally {//通知一次异步查询完成
                                    countDownLatch.countDown();
                                }
                            }
                        }, isUniqKey);
                } catch (Exception e) {
                    log.warn("queryMessage exception", e);
                }

            }
//等待所有broker异步查询完成
            boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
            if (!ok) {
                log.warn("queryMessage, maybe some broker failed");
            }

            long indexLastUpdateTimestamp = 0;
            List<MessageExt> messageList = new LinkedList<MessageExt>();
            for (QueryResult qr : queryResultList) {
                if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
                    indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
                }

                for (MessageExt msgExt : qr.getMessageList()) {
                    if (isUniqKey) {//唯一key,则返回最新存储时间的broker上消息
                        if (msgExt.getMsgId().equals(key)) {

                            if (messageList.size() > 0) {

                                if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {

                                    messageList.clear();
                                    messageList.add(msgExt);
                                }

                            } else {

                                messageList.add(msgExt);
                            }
                        } else {
                            log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                        }
                    } else {//一组key,则包含key的消息即保存
                        String keys = msgExt.getKeys();
                        if (keys != null) {
                            boolean matched = false;
                            String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
                            if (keyArray != null) {
                                for (String k : keyArray) {
                                    if (key.equals(k)) {
                                        matched = true;
                                        break;
                                    }
                                }
                            }

                            if (matched) {
                                messageList.add(msgExt);
                            } else {
                                log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
                            }
                        }
                    }
                }
            }

            if (!messageList.isEmpty()) {
                return new QueryResult(indexLastUpdateTimestamp, messageList);
            } else {
                throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
            }
        }
    }

    throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
}

七 MQClientAPIImpl

7.1 请求码处理函数ClientRemotingProcessor

7.1 请求码处理分发函数

public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.CHECK_TRANSACTION_STATE:
//校验事务消息生产者的事务状态,使用事务校验线程异步处理,
//调用事务校验回调函数检查事务状态,根据本地事务状态发送事务状态对应的处理方式消息给broker
            return this.checkTransactionState(ctx, request);
        case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
            return this.notifyConsumerIdsChanged(ctx, request);
        case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
            return this.resetOffset(ctx, request);
        case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
            return this.getConsumeStatus(ctx, request);

        case RequestCode.GET_CONSUMER_RUNNING_INFO:
            return this.getConsumerRunningInfo(ctx, request);

        case RequestCode.CONSUME_MESSAGE_DIRECTLY:
            return this.consumeMessageDirectly(ctx, request);
        default:
            break;
    }
    return null;
}

7.2 代理接口

  • 和namesrv及broker通信的接口都从这里代理.

7.2.1 发送消息

  • 组装发送消息体,支持批量消息或单个消息
RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());
  • 支持异步,同步,单向发送消息接口
switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
            default:
                assert false;
                break;
        }
  • 异步发送消息
private void sendMessageAsync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final AtomicInteger times,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws InterruptedException, RemotingException {
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//构建异步请求响应回调函数
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (null == sendCallback && response != null) {
//无用户回调函数,有响应消息
                    try {//处理响应报文
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        if (context != null && sendResult != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }
                    } catch (Throwable e) {
                    }
//更新broker抑制时间
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    return;
                }

                if (response != null) {
                    try {//有响应回调函数,则调用用户回调函数接口。
                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                        assert sendResult != null;
                        if (context != null) {
                            context.setSendResult(sendResult);
                            context.getProducer().executeSendMessageHookAfter(context);
                        }

                        try {
                            sendCallback.onSuccess(sendResult);
                        } catch (Throwable e) {
                        }

                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                    } catch (Exception e) {
                        producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, e, context, false, producer);
                    }
                } else {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    if (!responseFuture.isSendRequestOK()) {
                        MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else if (responseFuture.isTimeout()) {
                        MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                            responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    } else {
                        MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                        onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                            retryTimesWhenSendFailed, times, ex, context, true, producer);
                    }
                }
            }
        });
    }
  • 处理响应,解析响应报文
  • 处理异常
private void onExceptionImpl(final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int timesTotal,
    final AtomicInteger curTimes,
    final Exception e,
    final SendMessageContext context,
    final boolean needRetry,
    final DefaultMQProducerImpl producer
) {
    int tmp = curTimes.incrementAndGet();
    if (needRetry && tmp <= timesTotal) {//发送重试
        String retryBrokerName = brokerName;//by default, it will send to the same broker
        if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
            MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
            retryBrokerName = mqChosen.getBrokerName();
        }
        String addr = instance.findBrokerAddressInPublish(retryBrokerName);
        log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
            retryBrokerName);
        try {
        //更新请求唯一id    request.setOpaque(RemotingCommand.createNewRequestId());
//再次发送异步消息
            sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
                timesTotal, curTimes, context, producer);
        } catch (InterruptedException e1) {
            onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                context, false, producer);
        } catch (RemotingConnectException e1) {
            producer.updateFaultItem(brokerName, 3000, true);
            onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                context, true, producer);
        } catch (RemotingTooMuchRequestException e1) {
            onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                context, false, producer);
        } catch (RemotingException e1) {
            producer.updateFaultItem(brokerName, 3000, true);
            onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
                context, true, producer);
        }
    } else {
//不重试则处理异常结果
        if (context != null) {
            context.setException(e);
            context.getProducer().executeSendMessageHookAfter(context);
        }

        try {//调用回调函数异常处理函数
            sendCallback.onException(e);
        } catch (Exception ignored) {
        }
    }
}

7.2.2 拉取消息

  • 同步拉取消息
private PullResult pullMessageSync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis
    ) throws RemotingException, InterruptedException, MQBrokerException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
//根据结果转换状态码,返回拉取的消息
        return this.processPullResponse(response);
    }
  • 异步拉取消息
private void pullMessageAsync(
    final String addr,
    final RemotingCommand request,
    final long timeoutMillis,
    final PullCallback pullCallback
) throws RemotingException, InterruptedException {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {//封装一层异步回调函数
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) {
                try {//处理响应结果
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                    assert pullResult != null;
//调用上层的拉消息异步回调,push方式的消息快照缓存处理
                    pullCallback.onSuccess(pullResult);
                } catch (Exception e) {
                    pullCallback.onException(e);
                }
            } else {//无响应,调用异常回调函数处理
                if (!responseFuture.isSendRequestOK()) {
                    pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                } else if (responseFuture.isTimeout()) {
                    pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                        responseFuture.getCause()));
                } else {
                    pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                }
            }
        }
    });
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容