RocketMQ第五讲

broker是RockerMQ用来存储消息的服务,是RocketMQ的核心组件。NameServer和Producer都比较简单,Consumer和Broker都是比较复杂的。

从Client-Server视角来看,NameServer属于Server,Consumer和Producer都属于Client,而Broker两者兼具。Broker是Consumer和Producer的Server,同时又是NameServer的Client。

此篇文档是RocketMQ分析的第五篇文档,介绍Broker。下一篇将对照RocketMQ官方文档,介绍一下RocketMQ中设计到的一些概念性的知识。

rocketmq_architecture_3.png

核心业务及架构设计

broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。

CommitLog

消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

rocketmq_design_11.png

也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

ConsumerQueue

引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

rocketmq_design_7.png

其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

rocketmq_design_1.png

Index

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:HOME \store\index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

rocketmq_design_13.png

按照Message Key查询消息的时候,会用到这个索引文件。

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是:HOME\store\index{fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。202000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

接口和类图

RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

store模块

  • DefaultMessageStore

属性和方法很多,就不往这里放了。

  • CommitLog

文件存储实现类,包括多个内部类

  • MappedFile

· 对于文件夹下的一个文件

image.png

borker模块

  • SendMessageProcessor
  • PullMessageProcessor
  • QueryMessageProcessor
  • BrokerController:核心控制器
image.png

remoting模块

  • NettyRemotingClient
  • NettyRemotingServer
image.png

启动流程

实例化BrokerController

  • 实例化brokerConfig
  • 实例化nettyServerConfig
  • 实例化nettyClientConfig
  • 实例化messageStoreConfig
  • 实例化brokerController,并初始化
  1. 从磁盘加载文件到内存:topicConfigManager、consumerOffsetManager、subscriptionGroupManager、consumerFilterManager、messageStore。
  2. 实例化remotingServer
  3. 实例化各种线程池,sendMessageExecutor、pullMessageExecutor、replyMessageExecutor、queryMessageExecutor、adminBrokerExecutor、clientManageExecutor、heartbeatExecutor、endTransactionExecutor、consumerManageExecutor。
  4. 调用registerProcessor()方法,注册各种线程池到各种对于的process上,process包括:SendMessageProcessor、PullMessageProcessor、ReplyMessageProcessor、QueryMessageProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor、AdminBrokerProcessor。
  5. 启动一些定时任务
  • 启动各种服务
  • messageStore
    reputMessageService:构建索引线程
    flushConsumeQueueService
    flushCommitLogService:内存数据刷到磁盘服务
    storeStatsService
  • remotingServer:NettyServer服务
  • brokerOuterAPI:与NameServer通信的NettyClient服务
  • pullRequestHoldService
  • 启动定时任务

线程模型

上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档:Reactor线程模型

rocketmq_design_6.png

上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

备注

NameSev线程说明

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一个

NettyServerNIOSelector_:默认为三个

NSScheduledThread:定时任务线程

ServerHouseKeepingService:守护线程

ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

Broker线程说明

logback日志异步线程

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

Processor使用线程

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

RemotingClient线程

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

==================================================================

定时任务线程

BrokerControllerScheduledThread:=>

BrokerController.this.getBrokerStats().record();

BrokerController.this.consumerOffsetManager.persist();

BrokerController.this.consumerFilterManager.persist();

BrokerController.this.protectBroker();

BrokerController.this.printWaterMark();

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

BrokerController.this.printMasterAndSlaveDiff();

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

BrokerFastFailureScheduledThread:=>

FilterServerManagerScheduledThread:=>

FilterServerManager.this.createFilterServer();

ClientHousekeepingScheduledThread:=>

ClientHousekeepingService.this.scanExceptionChannel();

服务线程

PullRequestHoldService

FileWatchService

未知线程

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容