RocketMQ源码解析(六)-Broker#MessageStore

Broker将消息存储抽象成MessageStore接口,默认实现类是DefaultMessageStore。主要提供如下方法:

  • 保存消息,包括单条和批量保存
  • 根据topic、queue和offset批量获取消息,consumer使用该方法来拉取消息
  • 根据消息offset读取消息详情,根据messageId查询消息时使用该方法
  • 根据messageKey查询消息,可提供给终端用户使用
    下面我们根据一个MessageStore的数据结构图来看下消息是如何存储的

数据结构图

MessageStore数据结构图

【注】以上图片转载自博客RocketMQ消息存储流程图及数据结构

数据结构

通过上面的图可以看到消息存储涉及到一下几个数据结构:
CommitLog,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量。举个例子,当前commitLog文件的大小是12413435字节,那下一条消息到来后它的offset就是12413436。这个说法不是非常准确,但是offset大概是这么计算来的。commitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
ConsumeQueue,既然所有消息都是存储在一个commitLog中,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue文件。ConsumeQueue文件中存储的是消息在commitLog中的offset,可以理解成一个按queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset。
IndexFile,CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile中查询offset,然后根据offset去commitLog中查询消息详情。

线程服务

MessageStore除了上面的数据结构以外,还需要相应的服务来对数据做操作。
IndexService,负责读写IndexFile的服务
ReputMessageService,消息存储到commitLog后,MessageStore的接口调用就直接返回了,后续由ReputMessageService负责将消息分发到ConsumeQueueIndexService
HAService,负责将master-slave之间的消息数据同步
以上就是MessageStore的整体结构了,下面看下它的启动过程。

MessageStore启动

启动入口在DefaultMessageStore.start()方法:

public void start() throws Exception {
        //1、写lock 文件,尝试获取lock文件锁,保证磁盘上的文件只会被一个messageStore读写
        lock = lockFile.getChannel().tryLock(0, 1, false);
        if (lock == null || lock.isShared() || !lock.isValid()) {
            throw new RuntimeException("Lock failed,MQ already started");
        }

        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
        lockFile.getChannel().force(true);
        //2、启动FlushConsumeQueueService,是一个单线程的服务,定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1sec
        this.flushConsumeQueueService.start();
        //3、启动CommitLog
        this.commitLog.start();
        //4、消息存储指标统计服务,RT,TPS...
        this.storeStatsService.start();
        //5、针对master,启动延时消息调度服务
        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
        //6、启动reputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
        if (this.getMessageStoreConfig().isDuplicationEnable()) {
            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
        } else {
            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        }
        this.reputMessageService.start();
        //7、启动haService,数据主从同步的服务
        this.haService.start();
        //8、对于新的broker,初始化文件存储的目录
        this.createTempFile();
        //9、启动定时任务
        this.addScheduleTask();
        this.shutdown = false;
    }

以上就是整个MessageStore服务启动的过程,其中有几项下面解释一下:

  • 第2步,数据写入文件后,因为多级缓存的原因不会马上写到磁盘上,所以会有一个单独的线程定时调用flush,这里是flush consumeQueue文件的。CommitLogIndexFile的也有类似的逻辑,只是不是在这里启动的
  • 第3步,启动CommitLog,CommitLog.start()代码如下:
    public void start() {
        //加载刷盘服务
        this.flushCommitLogService.start();
        //storePool flush
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }

FlushCommitLogService,跟第2步类似的,该服务负责将CommitLog的数据flush到磁盘,针对同步刷盘和异步刷盘,有两种实现方式
CommitLogService,这个service只有在采用内存池缓存消息的时候才需要启动。在使用内存池的时候,这个服务会定时将内存池中的数据刷新到FileChannel中,这个我们后面讲CommitLog的文章中再详细讲。

  • 第5步,在consumer的时候讲过,如果消息失败,broker会延时重发。对于延时重发消息(topic=SCHEDULE_TOPIC_XXXX),这个服务负责检查是否有消息到了发送时间,到了时间则从延时队列中取出后重新发送
  • 第7步,如果是Master,HAService默认监听10912端口,接收slave的连接请求,然后将消息推送给slave;如果是Slave,则通过该服务连接Master接收数据
  • 第9步,这里的定时任务主要有以下几个:
  1. 定时清理过期的commitLog、cosumeQueue和Index数据文件, 默认文件写满后会保存72小时
  2. 定时自检commitLog和consumerQueue文件,校验文件是否完整。主要用于监控,不会做修复文件的动作。
  3. 定时检查commitLog的Lock时长(因为在write或者flush时侯会lock),如果lock的时间过长,则打印jvm堆栈,用于监控。

以上就是整个启动的过程了,后续的文章开始讲解Broker是怎样接收Producer消息,还有怎样将消息交给Consumer的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容