本来想将broker和client分开写。但是他们的每个功能都是共同协作完成的,写broker的时候,难免会涉及到consumer和producer的细节,于是以大杂烩的方式粗略总结了rocketmq的主要功能,主要是broker。
一、nameSvr
RocketMq是Metaq的3.0版本,摒弃了之前使用的zk,用以新的命名服务namesrv,namesrv的代码量比较少,我们从这里开始我们的源码分析路程。
可以看到Main-ClassNamesrvStartup
除了读取配置,初始化日志以外,主要功能是由NamesrvController
完成的。我们进一步查看NamesrvController类,发现其是有以下几个模块构成:RemotingServer
,KVConfigManager
,RouteInfoManager
1.2 namesrv的内部组件
1.2.1 KVConfigManager
KVConfigManager
为broker提供namespace,K,V 的双层kv存储/读取功能。当发生数据变更时,会实时刷盘。内部用读写锁来控制并发操作。
1.2.2 NettyRemotingServer
提供对外消息接收处理服务。NettyRemotingServer
继承自NettyRemotingAbstract
,后者对netty进行了封装,抽象出server和client的公共部分。server的实现即为NettyRemotingServer
。namesrv
通过其registerDefaultProcessor
方法注册了消息处理对象DefaultRequestProcessor
。处理如下消息:
- K,V 系统增删改查功能;
- broker的注册注销;topic的增删改查;
- 获取集群信息,包括broker列表和broker的集群名称和broker组名称;
具体实现根据功能转由KVConfigManager
和RouteInfoManager
模块处理。
1.2.3 RouteInfoManager
路由信息管理,namesrv的核心模块。维护以下列表的增删改查
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
1.3 定时任务scheduledExecutorService
处理两个事情:
剔除失联的broker,默认2min超时。
打印KVConfigManager中的存储信息,默认10min触发一次。
二、broker的组件
1.remotingServer 监听10911端口,用于监听comsumer和producer的消息
2.fastRemotingServer 监听端口10909 fastRemotingServer,端口号由10911-2得到。功能和10911高度重合(不懂作用是啥?可能是类似于vip通道?)
3.brokerOuterAPI broker的客户端网络接口,维护与namesvr的通讯,每隔30s执行一次
registerBroker
,用于把自身维护的topic信息发送到所有namesvr,同时这次报文也充当心跳的作用。4.messageStore 存储模块,可以说是broker的核心。
4.1scheduleMessageService负责延时将队列的消息写到真实队列。
4.2flushConsumeQueueService定期将consumequeue刷盘。
4.3commitLog
4.4storeStatsService 统计消息写入commitlog耗时,qps等信息。
4.5 reputMessageService 每1ms将commitlog中的变化量写入consumerqueue.
4.6 haService
4.7cleanCommitLogService负责清理commitLog的过期文件,如果满足时间(每天凌晨4点)/磁盘空间不足/有人手动删除过(???没看懂) 中的一项,即可删去3天内没有修改记录的commitlog。
4.8cleanConsumeQueueService 清理consumequeue的过期文件,触发条件比较苛刻,一个consumequeue文件最多可以存储30w个消息位置信息,检查最后一个消息的offset(即这个文件中最新的消息)是否小于commitlog的最小offset,如果是则删除。即如果某个文件的消息不满30w,肯定不会被删除。
5.pullRequestHoldService consumer 拉取消息如果还没有消息,则可以阻塞一定的时间直到有新的消息或超时。pullRequestHoldService用于维护这些拉取请求。
6.clientHousekeepingService 用于清理1,2,3中心跳超时的链接。
7.filterServerManager 定期执行脚本
startfsrv.sh
,启动Filtersrv服务,暂未研究其工作内容。
三、一条producer发来的消息处理流程
2.1 正常流程
- 预步骤:
adminProcessor
用于处理remotingServer
和fastRemotingServer
的处理器,除却BrokerController#registerProcessor
方法中特殊注册的处理器,其他都由adminProcessor
处理。在这里,将处理器SendMessageProcessor注册到通讯服务中。
- 预步骤:
- producer把消息发给broker,由netty转交给
SendMessageProcessor#sendMessage
函数处理。
- producer把消息发给broker,由netty转交给
- 检查本地是否有该topic的该队列。如果没有且配置了自动创建,按照
TBW102
中的perm
(位于~/store/config/topics.json,perm共3bit,分别代表可读,可写,可自动创建。注意客户端配置的queue不能超过默认)队列的配置来创建一个新的topic及其队列。(默认创建不太好,消息服务无法控制消息来源。一个broker默认创建了新的队列以后,根据默认的pushConsumer的实现,如果使用者没有额外配置其他broker来消费者这个topic,后续该topic下的消息均会打到这个broker上。还是由运维人员手动配置比较好)。
- 检查本地是否有该topic的该队列。如果没有且配置了自动创建,按照
- 由
DefaultMessageStore#putMessage
->CommitLog#putMessage
来将消息写入commitLog中。 如果消息配置了延迟处理的话(18个级别,分别对应时间 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"),会放入队列SCHEDULE_TOPIC_XXXX
,
broker会触发DeliverDelayedMessageTimerTask
任务,在时间到达的时候将消息再写入原始的topic(所以物理上会存了两遍)。刷盘策略见后文。
- 由
5.每隔1ms执行
ReputMessageService#doReput
,目的是以队列分类,记录每条消息的大小,保存在consumequeue/TopicTest/0/00000000000000000000
(示例topic+queue+offset)中,从commitlog中读取还没有写入consumer的消息。(疑问:CommitLog#checkMessageAndReturnSize
的时候又读了一次commitLog,为什么不做成包装一个简要信息塞入队列的方式抛给ReputMessageService)。6.触发等待该队列的consumer的pullRequest。
2.2 处理失败的消息
consumer对于消费失败的消息,可以在MessageListenerConcurrently#consumeMessage
处理的时候抛出异常或者返回RECONSUME_LATER
来标示消息处理失败。对于失败的消息将会把原始消息的offset发回给broker。
broker在commitlog中找到原始的消息内容,取出来并投递到新的retry topic(名称%RETRY%+consumerGroupName)中。这里有两个关键点:根据延迟level指数退避先投递到延迟队列,如果最后retry topic依然消费失败。那么进入死信队列(名称%DLQ%+consumerGroupName)。
四、producer和consumer的工作流程
在了解消费流程之前,我们先看一下client(producer和consumer)的工作流程。
producer和consumer的工作流程大致是一致的,但是也有差异部分。详见MQClientInstance#start
。
3.1 公共部分
3.1.1 MQClientAPIImpl 负责和namesrv、broker通讯。其中带
oneway
的发送接口作用:发送成功即可,不管broker的反应。3.1.2 定时任务,是和MQClientAPIImpl模块配合使用。
从nameserver获取broker信息。
向broker发送心跳,并清除client中超时broker
consumer通知broker更新消费进度
动态更新本地线程池大小(3.4.6 版本中还没有实现完全).
3.2 consumer独有
其实producer也有启动相应线程,但是没有触发条件,无法执行逻辑。
3.2.1. RebalanceService
服务,每10s执行一次,对于每个消费组的每个topic,从broker获取到consumer同胞,然后根据负载算法均摊所有的队列。broker可以控制每个topic队列的多少来完成带权重的消息负载,producer可以通过指定发送的队列来实现权重生产。consumer如果要实现类似功能,可以调用setAllocateMessageQueueStrategy
修改rocketmq的负载策略。
3.2.2. PullMessageService
服务,负责发起拉取消息的任务。RebalanceService
服务调用抽象方法RebalanceImpl#dispatchPullRequest
将新增的broker队列分发出去,其中pushconsumer的实现RebalancePushImpl
会调用PullMessageService
的接口向目标broker发起拉取消息的请求。concumer从namesrv中获取同组同topic的消费者,每个消费者分配不重复的队列,所以具体使用的时候,消费者的数量应该要大于队列的数量是没有意义的。具体实现是rebalanceByTopic#rebalanceByTopic
。
在上文<<一条producer发来的消息处理流程>>也有说过,消息有可能会消费失败,消费失败的消息最后都进了%RETRY%consumerGroupName
队列,因此消费者在消费的时候,除了订阅自己负责的topic,还需要订阅本消费组的retry队列。
五、一条push方式consumer发来的pull请求处理流程
1.consumer启动的时候触发rebalance,集群模式下需要从broker取得topic+consumerGroup的消费进度,方法
RebalancePushImpl#computePullFromWhere
。broker处理函数ClientManageProcessor#queryConsumerOffset
,从本地配置config/consumerOffset.json
中读取出进度返回给consumer。2.consumer从拿到的offset开始消费,broker处理入口
PullMessageProcessor#processRequest
,对请求参数做一系列的检查。先从consumequeue/TopicName/queueId/offset
(由于consumerqueue文件每个消息存储均用了20字节,因此可以很方便的根据offset读出实际消息在commitlog中的位置)中读取到实际conmmitlog中的消息位置,然后再到commitlog中找到具体的消息,返回给consumer。
consumer的拉取模式
对于push模式来说,rocketmq采用的却是pull的方式来获取消息。pull的间隔似乎决定了broker把消息推给consumer的延时,间隔太长,消息实时性无法保证,时间太短,徒增cpu和网络资源。但是rocketmq给出了一个比较好的解决方案。consumer对于分配到自己身上的每个broker的每个队列,在pull请求的时候给出一个挂起时间pollingTimeMills
(第一次是由RebalanceService
触发的,pollingTimeMills默认15s),如果对某个队列查询的结果是没有新消息,那么挂起pollingTimeMills
时间,期间如果有新消息到来,调用brokerPullRequestHoldService#notifyMessageArriving
来重新触发一次消息拉取返回给consumer,如果超时了也返回给consumer。consumer在接收到回复以后立即发起下一条查询。
producer发送同步消息如何实现?
每条消息有一个唯一标识opaque,发送一条消息前,创建一个ResponseFuture,ResponseFuture内部维护了一个计数为1的CountDownLatch对象,保存到上下文列表responseTable(类型:ConcurrentHashMap<Integer /* opaque */, ResponseFuture>)中, 在当前进程接收到消息后,先判断是请求还是回复,如果是回复则清除掉responseTable中的记录,并减少CountDownLatch的计数。用户在发送接口中等到CountDownLatch的结果就可以返回了。
顺序消息
顺序消息由以下两点来保证。
1.从producer到broker的顺序性,producer对于同一类顺序消息,选择同一个broker的同一个queue(调用SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) 接口来定制消息传往那个queue),tcp传输是有序的且broker中commitlog和consumer各队列消息的offset均是顺序读写。因此这点得到了保证。
2.消息从broker到consumer的有序性,同一个队列有且对应了一个consumer且从broker发送到consumer的时候是顺序的。此外consumer注册监听器为MessageListenerOrderly
类型,client内部判断如果是这种类型,在执行投递到线程池的ConsumeRequest
任务时,需要获取到对应queue队列的锁才能继续调用到用户代码,保证了第二点的顺序性。
六、存储模块补充
5.1 abort
broker在启动的时候会创建一个空的名为abort
的文件,并在shutdown时将其删除,用于标识进程是否正常退出,如果不正常退出,会在启动时做故障恢复(todo:分析具体逻辑)
5.2 commitlog和consumequeue
发给同一个broker的所有topic消息均顺序写在commitlog当中(包括消费失败的消息)。每条消息的大小不定,因为commitlog本身是无序且不定长的。所以需要有一种文件来记录每个topic每条消息存储的物理offset,即consumequeue。每个consumequeue文件顺序记录了某个broker中的某个queue的commitlog offset。但是要做到以groupName来分组消费,我们还需要以每种groupName创建一类可以存储每个group消费进度的文件,即config/consumerOffset.json。
5.2.1 刷盘逻辑
同步刷盘 需要broker配置为同步刷盘且producer在发送消息前调用
setWaitStoreMsgOK(true)
允许等待broker刷盘结果(默认也是true)。实现为GroupCommitService
,broker会尝试两次刷盘,并给出结果给客户端。异步刷盘 实现
FlushRealTimeService
,每秒触发,如果脏数据超过4页刷一次盘。每10s强制刷一次盘,最终是调用MappedByteBuffer#force()
方法。
5.3 index文件
index是rocketmq的索引文件,如果producer要让一条消息支持索引查询,在发送前需要指定message的key
字段。producer或者consumer可以根据方法queryMessage
(协议号12)查询所有broker中key是该值的消息记录。消息塞入实现在IndexFile#putKey
,消息获取实现在IndexFile#selectPhyOffset
。
每个index文件由 header(40byte),slot table(4byte500w,每个索引消息的位置: hash(topic+key)%500w),index list(20byte200w,存储消息在commitlog的位置信息) 三个部分组成。hash冲突如何解决?因为要写入文件,开链法肯定行不通。rocketmq采取的方式是indexList部分顺序写,同时每个index记录存储了前一个相同hash的index的位置。而最尾部的index节点位置存储在slot table中。
index文件有如下几个缺点(自己总结的,可能有谬误)
index文件中没有存储topic+key
的值,因此对给定一个key,查询出来的结果可能包含无效值(其他hash值一样的key),需要client二次过滤,因此client需要尽量确保key是唯一的。
client在查询时,给定key,maxNum,如果实际获取的list比较大,会查询不全。但是rocketmq没有提供分页的机制。
七、待完善的部分
1.事务
2.Filtersrv服务
3.HA