消息进度管理

总览

消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,然后也会从ProcessQueue处理队列中移除该批消息,返回ProcessQueue最小偏移量,并存入消息进度表中。

广播模式

同一个消费组的所有消息消费者都需要消费主体下的所有消息,也就是统一组内的消费者的消费行为是相互独立的,互相不影响,故消息进度需要独立存储,与消费者绑定,消费进度存储在消费者本地LocalFileOffsetStore。

集群模式

同一个消费组内的所有消费消费者共享主题下的所有消息,同一条消息在同一时间只会被消费者组内的一个消费者消费,并且消息队列的动态变化重新负载,所以消费进度需要保存在一个每一个消费者都能访问到的地方,因此消息消费进度存储文件存放在消息服务器端Broker。

消费进度存储维度

消息消费进度的存储是以MessageQueue为维度的,因为消费者消费也是以MessageQueue为维度的,一个消费者可以消费topic下的一个MessageQueue,或者两个MessageQueue,而这个MessageQueue不会被两个消费者消费,所以这个MessageQueue的消费进度可以暂时缓存在消费者端;缓存在消费者端,进行了消息消费,更新这个进度实时性高、方便。 所以,在一直进行消息消费的时候,会更新这个缓存的offset;默认5秒定时持久化一次offset到Broker端。只有在发生消息队列发生负载均衡的时候,消费者消费新增了MessageQueue队列,才回去Broker端请求这个MessageQueue的offset;一般在消息实时消费的时候,请求消费者端缓存的offset就可以,它也代表了这个MessageQueue的消费进度。

消费流控

消费者消费ProcessQueue中的消息是以多线程的方式,将这个ConsumeRequest提交到线程池进行消息消费。可以多个ConsumeRequest并发消费。消费进度的更新是使用当前ProcessQueue的最小进度的,比如Task1消费(50,70)的offset之间的数据、Task2消费(90,120)的offset之间的数据,Task3消费(10,30)的offset的数据,如果Task1,Task2消费完毕了,Task3正在进行消费,Task1、Task2并不会去更新消费进度的,因为Task3的10这个offset还没有被消费;Task3如果发生了死锁,导致一直无法被消费,消费进度无法向前推进。为了避免发生这种情况,RocketMQ引入了消息拉取的流控措施:DefaultMQPushConsumer#consumeConcurrentlyMaxSpan=2000,消息处理队列ProcessQueue中最大消息偏移量与最小偏移量不能超过该值,如超过该值,触发流控,将延迟该消息队列的消息拉取。

LocalFileOffsetStore

广播模式采用LocalFileOffsetStore是将消费进度存储在消费者端。
存储消费进度在消费端本地文件。基本方法都是消息进度启动加载、更新消费进度、查询消费进度、持久化消费进度等方法。

存储文件为:${RocketMQ_Home}/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json。

消费进度内存文件结构: ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()

定时任务持久化消费进度,默认5s一次。

// MQClientInstance定时任务持久化消费进度,默认5s一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

RemoteBrokerOffsetStore

集群模式采用RemoteBrokerOffsetStore将消息进度存储文件存放在消息服务端Broker端。RemoteBrokerOffsetStore和LocalFileOffsetStore非常相似,只有消费进度最终持久化在Broker端,略有不同。

消费进度读取

消费进度读取分为从内存读取、从磁盘读取:
在消息持续消费的时候,从内存读取;
在发生队列负载均衡的时候,从磁盘读取;请求Broker端,读取新队列的消费的offset。

public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        // 消息消费进度读取模式
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            case READ_FROM_STORE: {
                try {
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                }
                // No offset in broker
                catch (MQBrokerException e) {
                    return -1;
                }
                //Other exceptions
                catch (Exception e) {
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                }
            }
            default:
                break;
        }
    }

    return -1;
}
消费进度更新

更新Broker端消息消费进度。请求命令为UPDATE_CONSUMER_OFFSET请求,更新ConsumerOffsetManager的offsetTable;Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json。

样例
{
    "offsetTable":{
        "TopicTest@please_rename_unique_group_name_4":{0:139653,1:139653,2:139655,3:139656
        }
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容