RocketMQ基础篇 Consumer消费消息

消费消息逻辑

消费消息逻辑主要分为三个模块

  • Rebalance
  • 拉取消息
  • 消费消息

Rebalance

RocketMQ基础篇 Consumer消费消息 流程图.png
// RebalanceImpl
public void doRebalance(final boolean isOrder) {
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    // 遍历每个主题的队列
    // subTable 会在 DefaultMQPushConsumerImpl 的 subscribe 和 unsubscribe 时修改
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 对队列进行重新负载
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }

  this.truncateMessageQueueNotMyTopic();
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
  switch (messageModel) {
    case BROADCASTING: {
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      if (mqSet != null) {
        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
        if (changed) {
          this.messageQueueChanged(topic, mqSet, mqSet);
          log.info("messageQueueChanged {} {} {} {}",
                   consumerGroup,
                   topic,
                   mqSet,
                   mqSet);
        }
      } else {
        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
      }
      break;
    }
    case CLUSTERING: {
      // topicSubscribeInfoTable topic订阅信息缓存表
      Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
      // 发送请求到broker获取topic下该消费组内当前所有的消费者客户端id
      List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
      if (null == mqSet) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
        }
      }

      if (null == cidAll) {
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
      }

      if (mqSet != null && cidAll != null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);

        // 排序保证了同一个消费组内消费者看到的视图保持一致,确保同一个消费队列不会被多个消费者分配
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        // 分配算法 (尽量使用前两种)
        // 默认有5种 1)平均分配 2)平均轮询分配 3)一致性hash
        // 4)根据配置 为每一个消费者配置固定的消息队列 5)根据broker部署机房名,对每个消费者负责不同的broker上的队列
        // 但是如果消费者数目大于消息队列数量,则会有些消费者无法消费消息
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        // 当前消费者分配到的队列
        List<MessageQueue> allocateResult = null;
        try {
          allocateResult = strategy.allocate(
            this.consumerGroup,
            this.mQClientFactory.getClientId(),
            mqAll,
            cidAll);
        } catch (Throwable e) {
          log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                    e);
          return;
        }

        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
        if (allocateResult != null) {
          allocateResultSet.addAll(allocateResult);
        }

        // 更新消息消费队列,如果是新增的消息消费队列,则会创建一个消息拉取请求并立即执行拉取
        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
        if (changed) {
          log.info(
            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
            allocateResultSet.size(), allocateResultSet);
          this.messageQueueChanged(topic, mqSet, allocateResultSet);
        }
      }
      break;
    }
    default:
      break;
  }
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                   final boolean isOrder) {
  boolean changed = false;

  Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
  while (it.hasNext()) {
    Entry<MessageQueue, ProcessQueue> next = it.next();
    MessageQueue mq = next.getKey();
    ProcessQueue pq = next.getValue();

    if (mq.getTopic().equals(topic)) {
      // 当前分配到的队列中不包含原先的队列(说明当前队列被分配给了其他消费者)
      if (!mqSet.contains(mq)) {
        // 丢弃 processQueue
        pq.setDropped(true);
        // 移除当前消息队列
        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
          it.remove();
          changed = true;
          log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
        }
      } else if (pq.isPullExpired()) {
        switch (this.consumeType()) {
          case CONSUME_ACTIVELY:
            break;
          case CONSUME_PASSIVELY:
            pq.setDropped(true);
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
              it.remove();
              changed = true;
              log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                        consumerGroup, mq);
            }
            break;
          default:
            break;
        }
      }
    }
  }

  List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
  for (MessageQueue mq : mqSet) {
    // 消息消费队列缓存中不存在当前队列 本次分配新增的队列
    if (!this.processQueueTable.containsKey(mq)) {
      // 向broker发起锁定队列请求 (向broker端请求锁定MessageQueue,同时在本地锁定对应的ProcessQueue)
      if (isOrder && !this.lock(mq)) {
        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
        // 加锁失败,跳过,等待下一次队列重新负载时再尝试加锁
        continue;
      }

      // 从内存中移除该消息队列的消费进度
      this.removeDirtyOffset(mq);
      ProcessQueue pq = new ProcessQueue();

      long nextOffset = -1L;
      try {
        nextOffset = this.computePullFromWhereWithException(mq);
      } catch (Exception e) {
        log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
        continue;
      }

      if (nextOffset >= 0) {
        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
        if (pre != null) {
          log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
        } else {
          // 首次添加,构建拉取消息的请求
          log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
          PullRequest pullRequest = new PullRequest();
          pullRequest.setConsumerGroup(consumerGroup);
          pullRequest.setNextOffset(nextOffset);
          pullRequest.setMessageQueue(mq);
          pullRequest.setProcessQueue(pq);
          pullRequestList.add(pullRequest);
          changed = true;
        }
      } else {
        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
      }
    }
  }

  // 立即拉取消息(对新增的队列)
  this.dispatchPullRequest(pullRequestList);

  return changed;
}

集群模式下消息负载的步骤

由流程图和代码,我们可以得知,集群模式下消息负载主要有以下几个步骤:

  1. 从Broker获取订阅当前Topic的消费者列表
  2. 根据具体的策略进行负载均衡
  3. 对当前消费者分配到的队列进行处理
    1. 原来有,现在没有:丢弃对应的消息处理队列(ProcessQueue)
    2. 原来没有,现在有:添加消息处理队列(ProcessQueue),如果是第一次新增,还会创建一个消息拉取请求

Rebalance的策略

RocketMQ基础篇 Consumer消费消息 Rebalance策略.png

拉取消息

RocketMQ基础篇 Consumer消费消息 拉取消息流程图.png

拉取消息的代码太多了,就不再这里贴出来了。

拉取消息的大致流程

这里说一下大致流程,然后有几个需要注意的地方
流程:在我们Rebalance第一次添加负责的队列和后续拉取消息后,都会再提交一个拉取请求到拉取请求队列(pullRequestQueue)中,然后有一个线程不停的去里面获取拉取请求,去执行拉取的操作
这里说一个RocketMQ消费者这边设计的一个亮点
它将拉取消息,消费消息通过两个任务队列的方式进行解耦,然后每一个模块仅需要负责它自己的功能。(虽然大佬们觉得很常见,但是当时我看的时候还是感觉妙呀~)
另外还有一点需要注意的是:拉取消息的时候broker和consumer都会对消息进行过滤,只不过broker是根据tag的hash进行过滤的,而consumer是根据具体的tag字符串匹配过滤的。这也是有的时候,明明拉取到了消息,但是却没有需要消费的消息产生的原因
既然说到了消息过滤,这边先简单提一下RocketMQ消息过滤的几种方式

  • 表达式过滤
    • tag
    • SQL92
  • 类过滤

消费消息

RocketMQ基础篇 Consumer消费消息 流程图.png

这边也先说几个注意点吧,后面再单独更新一篇文章。
(一)顺序消费和非顺序消费消费失败的处理
非顺序消费:直接丢入延时队列中,等待重试。顺序消息:本地重试
(二)消费失败偏移量的更新:只有当前这批消息全部消费成功后,才会将偏移量更新成为这批消息最后一条的偏移量
(三)广播消息失败不会重试,仅打印失败日志

一些补充

为什么同一个消费组下消费者的订阅信息要相同

首先,先说一下什么叫做同一个消费组下消费者的订阅信息要相同
即:在相同的GroupId下,每一个消费者他们的订阅内容(Topic+Tag)要保持一致,否则会导致消息无法被正常消费
参考文档:阿里云:订阅关系一致

RocketMQ基础篇 Consumer消费消息 订阅关系一致说明.png

我们在看待这个问题的时候,可以把它分为两类情况考虑

  • topic不一致
  • tag不一致

(一)topic不一致的问题

首先先说一个场景,消费者A监听了TopicA,消费者B监听了TopicB,但是消费者A和消费者B同属一个groupTest
在Rebalance阶段,消费者A对TopicA进行负载均衡时,会去查询groupTest下的所有消费者信息。获取到了消费者A和消费者B。此时就会将TopicA的队列对消费者A和消费者B进行负载均衡(例如消费者A分配到了1234四个队列,消费者B分配到了5678四个队列)。此时消费者B没有针对TopicA的处理逻辑,就会导致推送到5678这几个队列里面的消息没有办法得到处理。
偏移量不发生变化

(二)tag不一致的问题

随着消费者A,消费者B负载均衡的不断进行,会不断把最新的订阅信息(消息过滤规则)上报给broker。broker就会不断的覆盖更新,导致tag信息不停地变化,而tag的变化在消费者拉取消息时broker的过滤就会产生影响,会导致一些本来要被消费者拉取到的消息被broker过滤掉
过滤了,偏移量在消费后直接更新

关于消息消费的推拉模式

MQPushConsumer方式,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。(我们日常开发时使用到的模式)
MQPullConsumer方式,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
总结:我们日常开发中使用到的是RocketMQ的推模式(虽然底层上的本质还是拉模式),但是不需要我们去处理拉取消息的动作触发等。如果我们要使用RocketMQ的拉模式,就需要我们自己实现从队列中拿拉取请求,然后显示调用拉消息的api,另外还要去往队列中放置拉取请求等操作。

消费幂等控制

除了基于业务状态等操作控制幂等的基础上,还可以通过消息的唯一id进行判断当前消息是否消费过
下图是生产者生产消息时产生的唯一标识 **UNIQ_KEY **,消息重试发送,这个msgid是不会变化的


RocketMQ基础篇 Consumer消费消息 通过msgid控制消费幂等.png

对应的消息内容

ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=broker-a  , queueId=14, storeSize=178, queueOffset=3, sysFlag=0, bornTimestamp=1659604568508, bornHost=/192.168.196.123:50628, storeTimestamp=1659604568547, storeHost=/172.30.0.4:10911, msgId=AC1E000400002A9F0000000000002B12, commitLogOffset=11026, bodyCRC=717801981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1659663419735, UNIQ_KEY=0000010163E018B4AAC21327B1BC003E, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54, 50], transactionId='null'}]]

消费者总结

讲了这么多的消费者的内容,出现了好多名词,也把消费者的一些比较核心的内容逐个讲了一遍。
那么,在这里,我们将消费者这个模块里面的所有东西,在进行一个完整的串联。然后消费者这一方面的介绍就要告一段落了


RocketMQ基础篇 Consumer消费消息 完整流程图2.png

附一张丁威老师的流程图


RocketMQ基础篇 Consumer消费消息 丁威老师流程图.jpeg
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容