消息进度管理

总览

消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,然后也会从ProcessQueue处理队列中移除该批消息,返回ProcessQueue最小偏移量,并存入消息进度表中。

广播模式

同一个消费组的所有消息消费者都需要消费主体下的所有消息,也就是统一组内的消费者的消费行为是相互独立的,互相不影响,故消息进度需要独立存储,与消费者绑定,消费进度存储在消费者本地LocalFileOffsetStore。

集群模式

同一个消费组内的所有消费消费者共享主题下的所有消息,同一条消息在同一时间只会被消费者组内的一个消费者消费,并且消息队列的动态变化重新负载,所以消费进度需要保存在一个每一个消费者都能访问到的地方,因此消息消费进度存储文件存放在消息服务器端Broker。

消费进度存储维度

消息消费进度的存储是以MessageQueue为维度的,因为消费者消费也是以MessageQueue为维度的,一个消费者可以消费topic下的一个MessageQueue,或者两个MessageQueue,而这个MessageQueue不会被两个消费者消费,所以这个MessageQueue的消费进度可以暂时缓存在消费者端;缓存在消费者端,进行了消息消费,更新这个进度实时性高、方便。 所以,在一直进行消息消费的时候,会更新这个缓存的offset;默认5秒定时持久化一次offset到Broker端。只有在发生消息队列发生负载均衡的时候,消费者消费新增了MessageQueue队列,才回去Broker端请求这个MessageQueue的offset;一般在消息实时消费的时候,请求消费者端缓存的offset就可以,它也代表了这个MessageQueue的消费进度。

消费流控

消费者消费ProcessQueue中的消息是以多线程的方式,将这个ConsumeRequest提交到线程池进行消息消费。可以多个ConsumeRequest并发消费。消费进度的更新是使用当前ProcessQueue的最小进度的,比如Task1消费(50,70)的offset之间的数据、Task2消费(90,120)的offset之间的数据,Task3消费(10,30)的offset的数据,如果Task1,Task2消费完毕了,Task3正在进行消费,Task1、Task2并不会去更新消费进度的,因为Task3的10这个offset还没有被消费;Task3如果发生了死锁,导致一直无法被消费,消费进度无法向前推进。为了避免发生这种情况,RocketMQ引入了消息拉取的流控措施:DefaultMQPushConsumer#consumeConcurrentlyMaxSpan=2000,消息处理队列ProcessQueue中最大消息偏移量与最小偏移量不能超过该值,如超过该值,触发流控,将延迟该消息队列的消息拉取。

LocalFileOffsetStore

广播模式采用LocalFileOffsetStore是将消费进度存储在消费者端。
存储消费进度在消费端本地文件。基本方法都是消息进度启动加载、更新消费进度、查询消费进度、持久化消费进度等方法。

存储文件为:${RocketMQ_Home}/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json。

消费进度内存文件结构: ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()

定时任务持久化消费进度,默认5s一次。

// MQClientInstance定时任务持久化消费进度,默认5s一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

RemoteBrokerOffsetStore

集群模式采用RemoteBrokerOffsetStore将消息进度存储文件存放在消息服务端Broker端。RemoteBrokerOffsetStore和LocalFileOffsetStore非常相似,只有消费进度最终持久化在Broker端,略有不同。

消费进度读取

消费进度读取分为从内存读取、从磁盘读取:
在消息持续消费的时候,从内存读取;
在发生队列负载均衡的时候,从磁盘读取;请求Broker端,读取新队列的消费的offset。

public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        // 消息消费进度读取模式
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            case READ_FROM_STORE: {
                try {
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                }
                // No offset in broker
                catch (MQBrokerException e) {
                    return -1;
                }
                //Other exceptions
                catch (Exception e) {
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                }
            }
            default:
                break;
        }
    }

    return -1;
}
消费进度更新

更新Broker端消息消费进度。请求命令为UPDATE_CONSUMER_OFFSET请求,更新ConsumerOffsetManager的offsetTable;Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json。

样例
{
    "offsetTable":{
        "TopicTest@please_rename_unique_group_name_4":{0:139653,1:139653,2:139655,3:139656
        }
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容