RocketMQ基础概念以及常见的坑

使用场景

异步处理、服务解耦、流量控制
消息队列的典型应用场景:
• 订单系统:在电商系统中,订单的创建、支付、发货等步骤可以通过消息队列进行异步处理和解耦。
• 日志处理:使用消息队列将日志从应用系统传输到日志处理系统,实现实时分析和监控。
• 任务调度:在批量任务处理、任务调度系统中,通过消息队列将任务分发给多个工作节点,进行并行处理。
• 数据同步:在数据同步系统中,消息队列可以用于将变更的数据异步同步到不同的存储系统或服务。

整体架构

image.png

Producer: 负责生产消息,一般由业务系统生产消息,可通过集群方式部署。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
Consumer: 负责消费消息,一般是后台系统负责异步消费,可通过集群方式部署。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。提供pull/push两者消费模式。
Broker Server: 负责存储消息、转发消息。RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备,存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Name Server: 名字服务,充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个NameServer实例组成集群,相互独立,没有信息交换。
Topic(逻辑概念):表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:
image.png

CommitLog(物理文件):消息存储文件,所有Topic的消息都存储在CommitLog文件中。单个文件大小默认1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。数据结构如下图:
image.png

下面表格说明下每条消息包含哪些字段,以及这些字段占用空间大小和字段简介。
image.png

RocketMQ中CommitLog的生命周期及其清理时机如下:

  1. 默认情况下,Broker会清理单个CommitLog文件中最后一条消息超过 72小时的CommitLog文件。除了用户手动清理外,以下几种情况会被默认清理。
  2. 如果CommitLog所在磁盘分区的磁盘占用率超过75%,则会触发CommitLog文件清理。
  3. 如果CommitLog所在磁盘分区的磁盘占用率超过85%,则会强制删除CommitLog文件。
  4. 如果磁盘占用率达到系统危险警戒线(默认90%),Broker将拒绝消息写入。
  5. Broker在启动的时候会注册定时任务,定时清理过期的数据,默认是每10s执行一次,分别清理CommitLog文件和ConsumeQueue文件。
  6. 通常情况下每天凌晨4点删除超过72小时的CommitLog;如果CommitLog所在磁盘分区的磁盘占用率超过75%,则会触发CommitLog文件清理。

在RocketMQ中,可以调整以下参数来设置CommitLog的生命周期:

  1. fileReservedTime:文件的保留时间,默认72小时。这个参数可以设置消息在CommitLog中保留的时间。
  2. deleteWhen:清理过期日志时间,例如设置为"04"表示凌晨4点开始清理。
  3. diskMaxUsedSpaceRatio:磁盘最大使用率,超过使用率会发起日志清理操作。

ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发ConsumeQueue,供消费者消费。ConsumeQueue仅记录消息的偏移位置以及消息长度,以及tag信息.实际消息内容存在CommitLog。ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。数据结构如下图:

image.png

IndexFile:CommitLog的索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法。单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计类似 JDK 的 HashMap 数据结构。主要由 Header、Slot Table、Index Linked List 三部分组成。

image.png

Header:IndexFile 的头部,占 40 个字节。主要包含以下字段:
image.png

Slot Table:默认包含 500w 个 Hash 槽,每个 Hash 槽存储的是相同 hash 值的第一个 IndexItem 存储位置 。
Index Linked List:默认最多包含 2000w 个 IndexItem
image.png

Topic,CommitLog,ConsumeQueue 三者的关系如下图:

生产端选择需要发送的队列(Topic),一个Topic可以对应多个ConsumeQueue,消息是顺序写到CommmitLog里面.ConsumeQueue、IndexFile都是基于CommitLog文件构建的。一个完整的消息写入流程包括:同步写入 Commitlog 文件缓存区,异步构建 ConsumeQueue、IndexFile 文件。
RocketMQ通过开启一个线程ReputMessageServcie来实时读取CommitLog文件新增内容,使用reputFromOffset来标记已经追踪到的位置。


image.png

功能

普通消息

SendReceipt sendReceipt = producer.send(message);

顺序消息

• 生产者:使用 MessageQueueSelector 将具有相同 orderId 的消息发送到同一个消息队列(MessageQueue),确保这些消息被发送到同一队列,从而保证顺序。
• 消费者:通过 DefaultMQPushConsumer 订阅 OrderTopic,RocketMQ 保证每个队列中的消息是按顺序消费的。只要相同订单 ID 的消息在同一队列中,就可以保证它们的消费顺序。

实现MQ顺序消息关键点

在发送时设置分片路由规则,让相同key的消息只落到指定queue上,然后消费过程中对顺序消息所在的queue加锁,保证消息的有序性,让这个queue上的消息就按照FIFO顺序来进行消费。因此我们满足以下三个条件是否就可以呢?
1)消息顺序发送: 多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性。

//采用的同步发送方式,在一个线程内顺序发送,异步发送方式为:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}

2)消息顺序存储:MQ 的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。对应到mq中,需要使用MessageQueueSelector来选择要发送的queue。即可以对业务编号设置路由规则,像根据队列数量对业务字段hash取余,将消息发送到一个queue中。

//使用"%"操作,使得订单id取余后相同的数据路由到同一个queue中,也可以自定义路由规则
long index = id % mqs.size();  
return mqs.get((int) index);

3)消息顺序消费:要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。这里RocketMQ已经为我们实现好了。

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }
 
         //....省略
        }
    }

消费者重新负载,并且分配完消费队列后,需要向mq服务器发起消息拉取请求,代码实现在RebalanceImpl#updateProcessQueueTableInRebalance()中,针对顺序消息的消息拉取,mq做了以上判断,即消费客户端先向broker端发起对messageQueue的加锁请求,只有加锁成功时才创建pullRequest进行消息拉取

public boolean lock(final MessageQueue mq) {
     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
     if (findBrokerResult != null) {
         LockBatchRequestBody requestBody = new LockBatchRequestBody();
         requestBody.setConsumerGroup(this.consumerGroup);
         requestBody.setClientId(this.mQClientFactory.getClientId());
         requestBody.getMqSet().add(mq);
 
         try {
             Set<MessageQueue> lockedMq =
                 this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
             for (MessageQueue mmqq : lockedMq) {
                 ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                 if (processQueue != null) {
                     processQueue.setLocked(true);
                     processQueue.setLastLockTimestamp(System.currentTimeMillis());
                 }
             }
 
             boolean lockOK = lockedMq.contains(mq);
             log.info("the message queue lock {}, {} {}",
                 lockOK ? "OK" : "Failed",
                 this.consumerGroup,
                 mq);
             return lockOK;
         } catch (Exception e) {
             log.error("lockBatchMQ exception, " + mq, e);
         }
     }
 
     return false;
 }

可以看到,就是调用lockBatchMQ方法发送了一个加锁请求,成功获取到消息处理队列就设为获取到锁,返回锁定成功,如果加锁成功,同一时刻只有一个线程进行消息消费。加锁失败,会延迟1000ms重新尝试向broker端申请锁定messageQueue,锁定成功后重新提交消费请求。

顺序消息可能存在的坑

经过我们正常发送顺序消息,存储顺序消息,消费者只需要老老实实地按照拉取到地消息顺序消费,就可以保证顺序性。
但是,仍然会出现一些问题,比如消费者如果消费失败了,对于一个普通消息来说,它就会进入消息重试,如果默认设置的16次重试都失败了,消息就会进入死信队列,需要人工介入处理。
这样对于顺序消息来说可能是不可接受的,因为前置的消息消费失败了,后续的消息若是消费成功,消息的顺序性就被打乱了。
比如下单操作,用户下单的消息消费失败了,相当于订单未能创建,但是付款的消息却执行成功了,用户一看,钱都扣了咋订单没了??于是客服电话又双叒叕被打爆了!

因此我们需要考虑顺序消息能不能重试,能不能放入死信队列?
RocketMQ 对于顺序消息的重试默认实现是执行 Integer.MAX_VALUE 次。 若是一直无法消费成功,那么就可能会把业务堵塞了。
因此顺序消息消费失败的处理的一个方法是将可能需要支持相关联的所有消息都直接失败,然后找个地方持久化保存这些消息,等待后续修复后再重新消费。

重平衡机制的影响

消费者的重平衡机制也会影响消息的顺序消费。当一个消费者突然宕机了,当前队列没有消费者,那么就会触发重平衡机制,让其他消费者顶替这个挂掉的消费者的位置。
如果这个挂掉的消费者此时没有消费消息倒是还好,新来的消费者就能继续完成后续消费。但是如果挂掉的消费者刚好消费消息到一半,那么此时它还未去修改 Broker 的消费位点,那么就会导致新来的消费者重复消费,甚至可能使得顺序不一致。
顺序消费的三把锁
第一把锁:分布式锁
RocketMQ 提供了一个 ConsumeMessageOrderlyService 类来保证顺序消费。
这个 service 启动的时候会向 Broker 申请当前消费者负责的队列锁,会将消费组、客户端ID、以及负责的队列发往 Broker。 Broker 会将这个队列与当前消费者进行绑定,将关系存储到本地。
这个锁实现了同一个消费者组内,只有一个消费者可以消费这个队列
这个锁有过期时间,消费者会定期(默认20s)给这个锁续期,确保对分布式锁的占有。
别的消费者要想消费队列,就必须也来加分布式锁,但是如果队列已经被别的消费者给绑定了,那么就无法消费。

第二把锁: Synchronized
这把锁确保了同一时刻只能有一个线程去消费这个队列。
这里需要了解一个前提:
一个消费者可以同时消费多个队列,而一个队列某刻只能被一个消费者消费。
由于占有分布式锁的消费者拿到消息后,它仍然是丢到线程池(因为消费者可能在消费多个队列,并发消费不同队列可以增加性能)去并发消费,但是我们的顺序消息必须保证有序,因此只能让一个线程去消费顺序消息

第三把锁:ReentrantLock
线程获取到 Synchronized 锁之后,还需要再到 ProcessQueue 中获取到 consumeLock 锁。这是一个 ReentrantLock。
这把锁是为了表明消费者正在消费消息
由于 RocketMQ 存在重平衡机制,我们前面了解到,如果当前消费者正在处理消费消息,还未向 Broker 提交消费点位,如果当前的消费者挂了,触发重平衡机制,那么新来的消费者就可能导致重复消费。
这个锁的功能就是表明当前这个队列还有消息正在被消费,无法重平衡,等待下一次重平衡。
当消费者去请求占有这个锁的时候,如果获取失败,就说明队列正在被消费,则重平衡失败。如果获取锁成功,那么就表明当前队列没有被消费消息,就可以去 Broker 中解除分布式锁,让新的消费者接管这个队列了。

实际上,这三把锁并不能完全保障顺序性和不重复
例如:当一个 Broker 挂了,那么 Broker 对应的队列就全部不可用了,此时会让集群中其他的 Broker 顶替这个挂掉的 Broker,原来队列中相关的消息只能被发送到别的队列里,那么就会被别的消费者消费。
要想保证顺序性,那么就得牺牲可用性,不能让消息发送到别得队列。要想保证可用性又会牺牲顺序性。
RocketMQ 对这两个模式都提供了方案:如果要绝对的顺序性,则创建 Topic 时要指定 -o 参数(–order)为true,且 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。
列中相关的消息只能被发送到别的队列里,那么就会被别的消费者消费。
要想保证顺序性,那么就得牺牲可用性,不能让消息发送到别得队列。要想保证可用性又会牺牲顺序性。
RocketMQ 对这两个模式都提供了方案:如果要绝对的顺序性,则创建 Topic 时要指定 -o 参数(–order)为true,且 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必须是 true。


延时消息

message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);

1)预定义延时级别
● RocketMQ不支持任意时间的延时消息,而是提供了18个等级的延迟时间,包括1秒、5秒、10秒、30秒、1分钟、2分钟、3分钟、4分钟、5分钟、6分钟、7分钟、8分钟、9分钟、10分钟、20分钟、30分钟、1小时和2小时。这种设计主要是出于性能考虑,如果支持任意时间的延迟,就会涉及到消息的排序,会有一定的性能损耗。而RocketMQ这种利用固定延迟级别到单个队列的实现方式是一种妥协,灵活性和极致性能的妥协。此外,RocketMQ 5.0版本引入了新的时间轮算法,简单理解就是把时间按照精度划分成N个Slot,消息会按照延迟时间加入到对应的Slot,然后线程定时扫描时间轮,把Slot对应的到期消息重新投递即可。

2)存储转换
• 当一条延时消息被发送到 RocketMQ 后,并不会立即存入目标主题(Topic)的消息队列中,而是首先被发送到了一个特殊的内部主题 SCHEDULE_TOPIC_XXXX 中。
• 在这个内部主题中,消息根据其延时级别被分配到不同的队列里,每个队列对应一个延时级别。

3)定时调度
• RocketMQ 内部有一个调度服务(Schedule Service),它会定期扫描 SCHEDULE_TOPIC_XXXX 主题下的消息队列。
• 根据消息的延时级别以及当前时间判断是否已经到达了该消息应该被投递的时间点。如果达了,则将这条消息重新发送到最初指定的目标 Topic 中,供消费者消费。

事务消息

image.png

1.在消息队列上开启一个事务主题。
2.事务中第一个执行的服务发送一条“半消息”(半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的)给消息队列。
3.半消息发送成功后,发送半消息的服务就会开始执行本地事务,根据本地事务执行结果来决定事务消息提交或者回滚。
补偿流程:RocketMQ 提供事务反查来解决异常情况,如果 RocketMQ 没有收到提交或者回滚的请求,Broker 会定时到生产者上去反查本地事务的状态,然后根据生产者本地事务的状态来处理这个“半消息”是提交还是回滚。值得注意的是我们需要根据自己的业务逻辑来实现反查逻辑接口,然后根据返回值 Broker 决定是提交还是回滚。而且这个反查接口需要是无状态的,请求到任意一个生产者节点都会返回正确的数据。
4.本地事务成功后会让这个“半消息”变成正常消息,供分布式事务后面的步骤执行自己的本地事务。(这里的事务消息,Producer 不会因为 Consumer 消费失败而做回滚,采用事务消息的应用,其所追求的是高可用和最终一致性,消息消费失败的话,RocketMQ 自己会负责重推消息,直到消费成功。)
其中,补偿流程用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。在 RocketMQ 事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ 事务消息的做法是:如果消息是“半消息”,将备份原消息的主题与消息消费队列,然后改变主题为 RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费“半消息”的消息,然后 RocketMQ 会开启一个定时任务,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。


常见问题

消息不丢失

image.png

1.生产者需要处理好 Broker 的响应,出错情况下利用重试、报警等手段。
2.Broker需要控制响应的时机,单机情况下是消息刷盘后返回响应,集群多副本情况下,即发送至两个副本及以上的情况下再返回响应。
3.消费者需要在执行完真正的业务逻辑之后再返回响应给 Broker。
但是要注意消息可靠性增强了,性能就下降了,等待消息刷盘、多副本同步后返回都会影响性能。因此还是看业务,例如日志的传输可能丢那么一两条关系不大,因此没必要等消息刷盘再响应。

消息的ACK模式

消息被消费,那么如何保证被消费成功呢?这里只有使用方控制,只有使用方确认成功了,才会消费成功,否则会重新投递。
RocketMQ其实是通过ACK机制来对失败消息进行重试和通知的,具体流程如下所示:

image.png

重试间隔
10秒/30秒/1分钟/2分钟/3分钟/4分钟/5分钟/6分钟/7分钟/8分钟/9分钟/10分钟/20分钟/30分钟/1小时/2小时


消息堆积

消息堆积是指在消息队列中,消息的生产速度远大于消费速度,导致大量消息积压在队列中。
我们需要先定位消费慢的原因,如果是 bug 则处理 bug,同时可以临时扩容增加消费速率,减少线上的资损。
如果是因为本身消费能力较弱,则可以优化下消费逻辑
常见有以下几种方式提升消费者的消费能力:

  1. 增加消费者线程数量:提高并发消费能力。
  2. 增加消费实例:在分布式系统中,可以水平扩展多个消费实例,从而提高消费速率。
  3. 优化消费者逻辑:检查消费者的代码,减少单个消息的处理时间。例如,减少 I/O 操作、使用批量处理等。
    注意上述的第二点:
    • 增加消费实例,一定要注意注意 Topic 对应的分区/队列数需要大于等于消费实例数,不然新增加的消费者是没东西消费的。因为一个 Topic中,一个分区/队列只会分配给一个消费实例
    除此之外还可以进行限流和降级处理:
    • 对消息生产端进行限流,降低生产速率,避免消息积压进一步恶化。
    • 对非关键消息进行丢弃或延迟处理,只保留高优先级的消息,提高系统的响应速度。

优化消费者逻辑常见做法

批量消费
• 通过一次性从队列中消费多条消息(如批量读取 100 条),可以减少每次拉取消息的网络开销,提高处理效率。
异步消费
• 使用异步处理方法,在消费的同时不阻塞后续消息的消费。处理完一条消息后立即开始处理下一条消息,提升并发度(但是要注意消息丢失的风险)。
优化数据库操作
• 如果消费者在处理消息时需要频繁访问数据库,可以通过数据库连接池、SQL 优化、缓存等手段减少数据库操作的时间。
• 使用批量插入或更新操作,而不是逐条处理,可以显著提升效率。
临时扩展队列的策略

临时扩展多个消费者队列
• 在消息积压严重时,可以通过临时扩展多个消费者队列,将积压的消息分配到不同的队列中进行消费。消费完成后,可以将这些临时队列关闭。

使用多队列调度机制
• 例如,使用 RabbitMQ 的 Exchange 机制,将消息按照特定规则路由到多个队列中。这样可以在消息堆积时,将不同类型的消息分开处理。

限流与降级的实现方式
生产者限流:
• 在生产者端增加限流逻辑,使用令牌桶、漏桶算法等限流策略,限制生产者的发送速率,从而避免消息队列被快速填满。
• 例如,在 Kafka 中可以通过配置生产者的 linger.ms 和 batch.size 来缓解消息发送的速度。

消费者降级:
• 在消息堆积严重时,对低优先级的消息进行丢弃或延迟处理。只保留高优先级的消费,确保系统核心功能的正常运行。
• 可以在消费端增加优先级队列或通过消息属性区分优先级,先处理高优先级的消息。


RocketMQ常见调优

  1. 磁盘瓶颈的识别与优化:首先需要通过工具来监测当前系统的磁盘I/O操作状况,这有助于理解目前磁盘是否在IOPS或吞吐量上遇到了限制。如果发现是读吞吐量达到瓶颈,而IOPS仍有空间,可以尝试减少预读设置。若IOPS已接近极限但吞吐量较低,则可适当增加预读大小以提高效率。对于无法单纯通过调整配置解决的情况,考虑横向(添加更多节点)或纵向(升级单个节点配置)扩展存储资源。
  2. 内存利用与“冷读”处理:由于RocketMQ依赖PageCache缓存数据,因此部署时应优先选用内存较大的机器,这样能有效减少直接从磁盘读取数据的机会。对于5.1.2版本及以上版本的RocketMQ,当遇到因拉取长期积压消息导致的大量磁盘访问时,可以通过设置 dataReadAheadEnable=false 来降低CommitLog文件的预读量,从而缓解这一问题。
  3. 文件清理机制调优:合理设定参数值,根据业务特点调整 deleteWhen 参数指定的消息清理时间点,设置合适的 fileReservedTime 控制消息保留周期,调整 diskMaxUsedSpaceRatio 确保磁盘利用率保持在一个健康水平。
  4. 集群参数调优:对Broker的几个属性可能影响到集群性能的稳定性,下面进行特别说明。开启异步刷盘提高集群吞吐,开启Slave读权限提高Master内存利用率,消费一次拉取消息数量由broker和consumer客户端共同决定,发送队列等待时间由参数waitTimeMillsInSendQueue设置,主从异步复制提高集群性能,提高集群稳定性。
  5. 消息延迟问题优化:分散定时消息的触发时间,合理选择延迟等级,增加集群资源,监控与调优。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容