rocketmq客户端消费流程

rocketmq客户端消费流程

只关注于集群模式下并发消费的push模式

组件概述

DefaultMQPushConsumerImpl
  • 负载均衡实现 RebalanceImpl
  • 拉取消息. PullAPIWrapper
  • 消费进度存储 OffsetStore
  • 消费服务 ConsumeMessageService
  • MQClientInstance 客户端核心实现
MQClientInstance
  • netty 客户端 业务线程池和回调线程池隔离
  • 定时任务
  • 负载均衡调度 RebalanceService
  • 拉消息任务调度 pullMessageService
  • 内部生产者 defaultMQProducer

MQClientInstance 和 消费者为一对多关系。使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。

启动 DefaultMQPushConsumerImpl.start()

  1. 生成InstanceName,如果用户未设置则为pid。

  2. 创建 MQClientInstance,使用InstanceName相同的生产者消费者都使用同一个MQClientInstance。MQClientInstance是客户端的核心。

    就是说一个MQClientInstance下会与多个消费者。MQClientInstance统一调度他们。

this.mQClientFactory =
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);

//后面会将消费者注册到mQClientFactory,让mQClientFactory有所有同一InstanceName消费者的引用。
boolean registerOK =     mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
         
  1. 为负载均衡实现rebalanceImpl 赋值
  2. 创建PullAPIWrapper 负责拉取消息
  3. 根据消费模式创建 OffsetStore
   switch (this.defaultMQPushConsumer.getMessageModel()) {
               case BROADCASTING://广播存储在本地
                   this.offsetStore =
                           new LocalFileOffsetStore(this.mQClientFactory,
                               this.defaultMQPushConsumer.getConsumerGroup());
                   break;
               case CLUSTERING://集群进度存储在远程
                   this.offsetStore =
                           new RemoteBrokerOffsetStore(this.mQClientFactory,
                               this.defaultMQPushConsumer.getConsumerGroup());
                   break;
               default:
                   break;
               }

​ OffsetStore 负责读取消费进度和同步消费进度

  1. 根据消费模式创建ConsumeMessageService 并启动

    并发消费不启动线程。

    顺序消费下启动定时任务,会调用消费者的RebalanceImpl的lockAll 方法。向broker发生请求锁住分配给他的队列。

                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    
                this.consumeOrderly = true;
                this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this,
                            (MessageListenerOrderly) this.getMessageListenerInner());
            }
            else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
            {
                this.consumeOrderly = false;
                this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this,
                            (MessageListenerConcurrently) this.getMessageListenerInner());
            }
  1. 启动MQClientInstance,多消费者引用同一个MQClientInstanceMQClientInstance只会启动一次

    mQClientFactory.start();
    
  2. 初始化

    //向nameser 拉取所关心的topic的路由信息  
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    //向所有路由信息里的所有broker发送心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //唤醒mQClientFactory的负责均衡服务,
    this.mQClientFactory.rebalanceImmediately();
    

启动 MQClientInstance.start()

一个MQClientInstance 只会启动一次。

1.启动netty 客户端

this.mQClientAPIImpl.start();//内置netty客户端

2.启动定时任务

this.startScheduledTask();//会启动5个定时任务
  • 从远程获取nameServer地址 发生变动时可以更新本地nameServer

远程地址被写死,暂时没有用。

MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  • 更新topic路由信息,topic路由发送变动时可以感知

    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    
  • 更新消费进度到broker 最终调用 DefaultMQPushConsumerImpl.offsetStore.persistAll

这里可以看出更新消费进度是异步的,这也是出现重复消息的原因之一

MQClientInstance.this.persistAllConsumerOffset();
  • 向broker发送心跳
MQClientInstance.this.cleanOfflineBroker();//清理下线的broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();//发送心跳
  • 动态调整线程池 根据DefaultMQPushConsumer 的 adjustThreadPoolNumsThreshold 参数和消息在消费者内部的堆积调整
MQClientInstance.this.adjustThreadPool(); 
  1. 启动调度服务
//拉消息线程
this.pullMessageService.start();
//Start rebalance service
//重负载线程
this.rebalanceService.start();
//Start push service 内部生产者用于消费失败时,发送重试消息
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

拉消息流程

拉消息的流程是先从负载均衡开始的。MQClientInstance的rebalanceService启动后会定时调用,所有消费者的doRebalance 方法。间隔10s

        private static long WaitInterval = 1000 * 10;//间隔10s    
        @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            this.waitForRunning(WaitInterval);//等待
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

    public void doRebalance() {
      //调用所有消费者的doRebalance
        for (String group : this.consumerTable.keySet()) {//consumerTable 消费者引用
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Exception e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
        //消费者最终会调用自己的负载均衡实现的doRebalance方法
        @Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) { //消费者调用自己的rebalanceImpl
            this.rebalanceImpl.doRebalance();
        }
    }

负载均衡实现

先拿到topic路由信息,然后循环对topic做负载

public void doRebalance() {
    //取得关心等待topic
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //对topic做负载
                this.rebalanceByTopic(topic);
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    //当topic变动时,移除多余topic对应的ProcessQueue
    this.truncateMessageQueueNotMyTopic();
}

负载分集群和广播模式,广播模式不讨论

在rocketmq中一个topic有多个队列。负载均衡就是将队列合理的分配给一个消费组的所有消费者。

有多种分配算法,继承AllocateMessageQueueStrategy,默认为AllocateMessageQueueAveragely

//先获取负载所需要的参数
//topic对应的所有队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//topic对应的所有客户端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

然后调用,返回的list就是分配给当前消费者的队列

   public List<MessageQueue> allocate(//
                                       final String consumerGroup,//
                                       final String currentCID,//
                                       final List<MessageQueue> mqAll,//
                                       final List<String> cidAll//
    );

而区分不同客户端的cidAll 就是每个客户端的ip@InstanceName ,使用同一ip下不能有相同的InstanceName。

比如AllocateMessageQueueAveragely有这一行

//取自己在客户端集合的下标,如果两个客户端InstanceName相同,那么index都一样,分配的队列也相同
int index = cidAll.indexOf(currentCID);

而这个负载算法是没有同步和校验等操作的,不同客户端不会进行通信。客户端不知道别人分配了哪些队列。全靠“自觉”,同一组内都使用同一策略那么分配是合理的,如果同一组内使用不同策略,队列的分配就会发生混乱。

拉取任务

rocketmq为每个分配给它的队列生成一个 拉取任务 ProcessQueue

将其存储在PullMessageService 的pullRequestQueue中,这是一个LinkedBlockingQueue

PullMessageService 启动后会从堵塞队列中取出拉取任务,然后进行消息的拉取。

分配队列完成后

 //返回队列是否发生了变化
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
                

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet) {//mqSet 分配给当前消费者的队列
        boolean changed = false;
                //存储上次分配的队列和对应的ProcessQueue拉取任务 
        //processQueueTable 是ConcurrentHashMap
        Iterator<Entry<MessageQueue, ProcessQueue>> it =
          this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {//topic 是否相等
                if (!mqSet.contains(mq)) { //上次分配队列,这次没分配给我
                    pq.setDropped(true);//禁用拉取任务 修改dropped属性。是volatile变量
                    //移除OffsetStore中存储的队列进度,移除前先提交进度
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", 
                                 consumerGroup, mq);
                    }
                }
         
               //据上次拉取间隔 120000ms 也移除它
                else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error(
                                        "[BUG]doRebalance, {}, remove unnecessary mq, {},
                                  because pull is pause, so try to fixed it",
                                        consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
                //新队列 处理
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                //生成拉取任务
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(new ProcessQueue());
                                //计算下次拉取的偏移
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    pullRequest.setNextOffset(nextOffset);
                    pullRequestList.add(pullRequest);
                    changed = true;
                    //记录下 用于下次对比
                    this.processQueueTable.put(mq, pullRequest.getProcessQueue());
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }
                //将拉取任务压入堵塞队列
        //最终调用 
        //PullMessageService.executePullRequestImmediately 
        //this.pullRequestQueue.put(pullRequest);
        this.dispatchPullRequest(pullRequestList);
        return changed;
    }
拉取消息

现在知道一个队列对应一个拉取任务ProcessQueue,存放在堵塞队列中,如果禁用了会将dropped属性修改为true。

谁来执行拉取呢,MQClientInstance.PullMessageService。

PullMessageService 启动后从堵塞队列取出拉取任务,找到对应的组调用pullMessage

PullMessageService 为单线程,所有拉取消息时为单线程拉取

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
          //从堵塞队列中取出1
            PullRequest pullRequest = this.pullRequestQueue.take();
            if (pullRequest != null) {
                this.pullMessage(pullRequest);
            }
    }

    log.info(this.getServiceName() + " service end");
}

  private void pullMessage(final PullRequest pullRequest) {
        //找到对应的组调用pullMessage
        final MQConsumerInner consumer = 
          this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //调用消费者的pullMessage,最终调用pullAPIWrapper.pullKernelImpl
            impl.pullMessage(pullRequest);
        }
    }
DefaultMQPushConsumerImpl.pullMessage

先进行限流等检查,如果不能通过会调用executePullRequestLater() 将任务放回队列,下次消费。

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    //提交到定时任务中
    this.scheduledExecutorService.schedule(new Runnable() {
        @Override
        public void run() {//待会在放入队列
            PullMessageService.this.executePullRequestImmediately(pullRequest);
        }
    }, timeDelay, TimeUnit.MILLISECONDS);
}

也会检查是否禁用。正常的任务拉取完成会放回队列,等待下次拉取。

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {//检查dropped属性。volatile修饰
    log.info("the pull request[{}] is droped.", pullRequest.toString());
   //被禁用直接抛弃 没被禁用的用完会放回队列
    return;
}

都完成后创建一个回调函数 PullCallback,然后异步拉取

因为网络层是netty,所以其实所有请求都是异步。同步的操作只是做了异步转同步而已。

this.pullAPIWrapper.pullKernelImpl(//
    pullRequest.getMessageQueue(), // 1
    subExpression, // 2
    subscriptionData.getSubVersion(), // 3
    pullRequest.getNextOffset(), // 4
    this.defaultMQPushConsumer.getPullBatchSize(), // 5
    sysFlag, // 6
    commitOffsetValue,// 7
    BrokerSuspendMaxTimeMillis, // 8
    ConsumerTimeoutMillisWhenSuspend, // 9
    CommunicationMode.ASYNC, // 10
    pullCallback// 11
    );

请求成功后触发回调函数。主要看 case FOUND,就可以了。其他代表没有新消息,偏移量不对等

//这里有一个mq自己实现的性能统计。我们在外部也可以拿到
consumer.getDefaultMQPushConsumerImpl().getConsumerStatsManager()
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult =
                    DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                        pullRequest.getMessageQueue(), pullResult, subscriptionData);

            switch (pullResult.getPullStatus()) {
            case FOUND:
                long prevRequestOffset = pullRequest.getNextOffset();
                pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                long pullRT = System.currentTimeMillis() - beginTimestamp;
                //性能统计
                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(
                    pullRequest.getConsumerGroup(),         
                  pullRequest.getMessageQueue().getTopic(), pullRT);
                long firstMsgOffset = Long.MAX_VALUE;
                if (pullResult.getMsgFoundList() == null || 
                    pullResult.getMsgFoundList().isEmpty()) {
                  //空消息,放回队列
                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                }
                else {
                    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                  //性能统计
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(
                        pullRequest.getConsumerGroup(), 
                        pullRequest.getMessageQueue().getTopic(),
                        pullResult.getMsgFoundList().size());
                    boolean dispathToConsume = 
                      processQueue.putMessage(pullResult.getMsgFoundList());
               //开始消费       
               DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                        pullResult.getMsgFoundList(), //
                        processQueue, //
                        pullRequest.getMessageQueue(), //
                        dispathToConsume);
            case NO_NEW_MSG:
            case NO_MATCHED_MSG:
            case OFFSET_ILLEGAL:
            default:
                break;
            }
        }
    }
开始消费

这里有一个分批消费的逻辑,根据consumeMessageBatchMaxSize拆分

取决于这个参数private int consumeMessageBatchMaxSize = 1;

如果设置大于1那么这批消息消费时只能全部成功或者全部失败

final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
    ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
    this.consumeExecutor.submit(consumeRequest);
}
else {
    for (int total = 0; total < msgs.size();) {
        List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
        for (int i = 0; i < consumeBatchSize; i++, total++) {
            if (total < msgs.size()) {
                msgThis.add(msgs.get(total));
            }
            else {
                break;
            }
        }
                //创建一个消费job
        ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue,
                                                           messageQueue);
        //提交到线程池
        this.consumeExecutor.submit(consumeRequest);
    }
}
//ConsumeRequest 是Runnable的实现
ConsumeRequest implements Runnable 

ConsumeRequest的run方法

@Override
public void run() {
   //又进行了队列禁用的校验
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped {}",
            this.messageQueue);
        return;
    }
        //用户的消费listener 实现
    MessageListenerConcurrently listener =    
    ConsumeMessageConcurrentlyService.this.messageListener;
    //创建Context
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    //这个Context 用于hook,在4.5的消息追踪中是借助此hook和Context实现的
    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext
            .setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer
                .getConsumerGroup());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        //调用hook:ConsumeMessageHook
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
            .executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();

    try {
        //将重试消息的topic替换为原来的topic
        ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
        //调用用户方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    }
    catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",//
            RemotingHelper.exceptionSimpleDesc(e),//
            ConsumeMessageConcurrentlyService.this.consumerGroup,//
            msgs,//
            messageQueue);
    }

    long consumeRT = System.currentTimeMillis() - beginTimestamp;

    if (null == status) {//返回null 或者异常设置为失败
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",//
            ConsumeMessageConcurrentlyService.this.consumerGroup,//
            msgs,//
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    
    // add by fuhaining@yolo24.com
    if (consumeMessageLog.isInfoEnabled()) {
        StringBuilder keys = new StringBuilder();
        for (MessageExt msg : msgs) {
            keys.append(msg.getMsgId()).append(",");
        }
        consumeMessageLog.info("concurrently - " + status.name() + " : " + 
                               keys.deleteCharAt(keys.length() - 1).toString());
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == 
                                         status);
       //调用hook
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl
            .executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(
        ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(),
      consumeRT);
    //再次校验
    if (!processQueue.isDropped()) {
        //处理结果
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    }
    else {
        log.warn(
            "processQueue is dropped without process consume result. messageQueue={}, 
          msgTreeMap={}, msgs={}",
            new Object[] { messageQueue, processQueue.getMsgTreeMap(), msgs });
    }
}
重试消息
//ConsumeMessageConcurrentlyService.processConsumeResult方法   
//在前面会进行性能统计

        switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING://广播略过
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
          (consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            //发送重试消息
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }

        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);
                        //发送失败的进定时任务,重试
            this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
                consumeRequest.getMessageQueue());
        }
        break;
    default:
        break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0) { 
   // 将消费进度提交到OffsetStore
   // OffsetStore 只会将进度记下,由前面说的定时任务同步给broker
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

将要重试的消息发会broker。只是把原来的id发回去。broker在会根据id读取原来消息的消息体

生成重试消息

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    try {
        //消息原来存哪,发会到哪
        String brokerAddr =(null != brokerName) ? 
          this.mQClientFactory.findBrokerAddressInPublish(brokerName)
          : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
          this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000);
    }
    catch (Exception e) {
      
     //如果发送失败,使用内部生产者发送
      this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }
  
 //  consumerSendMessageBack  方法
  
        ConsumerSendMsgBackRequestHeader requestHeader = new
            ConsumerSendMsgBackRequestHeader();
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
        requestHeader.setGroup(consumerGroupWithProjectGroup);
            //原来的topic
        requestHeader.setOriginTopic(msg.getTopic());
            //原消息的偏移
        requestHeader.setOffset(msg.getCommitLogOffset());
          //重试级别
        requestHeader.setDelayLevel(delayLevel);
            //记录原来的id
        requestHeader.setOriginMsgId(msg.getMsgId());
            //通过netty发送
        RemotingCommand response = this.remotingClient.invokeSync(addr, 
        request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
        }

流程总结

  1. MQClientInstancerebalanceService 线程启动。定时调用消费者的负载均衡实现RebalanceImpldoRebalance方法。

  2. RebalanceImpl根据负载策略AllocateMessageQueueStrategy计算属于自己的队列

  3. 根据队列的变化,生成新的拉取任务 ProcessQueue 或者将原来的ProcessQueue禁用

  4. 将新的 ProcessQueue放入MQClientInstancePullMessageServicepullRequestQueue这是一个LinkedBlockingQueue

  5. PullMessageService的线程会从队列中取出,然后调用对应消费者的PullAPIWrapperpullKernelImpl方法发送请求拉取

  6. 拉取为异步,在回调中将消息封装成ConsumeMessageConcurrentlyService.ConsumeRequest任务提交到ConsumeMessageConcurrentlyService的线程池ScheduledExecutorService

  7. 最终调用用户的实现进行消费

  8. 将消费失败消息发回broker生成重试消息

  9. 消费成功将进度写入消费者的OffsetStore 定时回写broker

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容