下面的源码包是本人在阅读《RocketMQ技术内幕》这本书的过程中,按照书中的讲解一步步将全部注释在写好在源码中,这个过程也是学习的过程。
当然大家如果直接拿着这份写好注释的源码来阅读书籍的话,会更加方便了。源码版本是4.7.0。
书籍介绍如下:
2.1 NameServer 架构设计
消息中间件的设计思路一般基于主题的订阅发布机制 消息生产者( Producer)发送某一 主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者(Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者( PUSH 模式)或者消息消费者主动向消息服务器拉取消息( PULL 模式),从而实现消息生产者与消息消费者解调。 为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。 那消息生产者如何知道消息要发往哪台消息服务器呢?如果某一台消息服务器若机了,那么生产者如何在不重启服务的情况下感知呢?NameServer 就是为了解决上述 问题而设计的。
3.1 漫谈 RocketMQ 消息发送
RocketMQ 支持 种消息发送方式 :同 步(sync ) 、 异步(async)、单向(neway ) 。同步 : 发送者向 MQ 发送消息时,同步等待, 直到消息服务器返回发送结果 。异步: 发送者向 MQ 执行发送消息执行发送消息 API 时,指定消息发送成功后的回掉函数,然后调用消息发送 API 后,立即返回,消息发送者线程不阻塞 ,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行 。单向:消息发送者向 MQ 执行发送消息 时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
4.1 存储概要设计
RocketMQ 主要存储的 文件 包括 Comitlog 文件、 ConsumeQueue 文件、 IndexFile 文件 。 RocketMQ 将所有主题的消息存储在同一个文件 中 ,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量 。 但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便 。 为了提高消息消费的效率, RocketMQ 引入了 ConsumeQueue 消息队列 文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件 。 IndexFile 索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从 Commitlog 文件中检索消息 。 RocketMQ 是一款高性能的消息中间件,存储部分的设计是核心,存储的核心是 IO 访问性能,本章也会重点剖析 RocketMQ 是如何提高 IO 访问性能的。
5.1 RocketMQ 消息消费概述
消息消费以组的模式开展, 一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式 。 集群模式,主题下的同一条消息只允许被其中一个消费者消费 。 广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。 消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式 。 所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。 RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务 。
源码示例如下:
public void onSuccess(PullResult pullResult) {
// 回调函数处理接收到的消息
if (pullResult !=null) {
// Step2:调用 pullAPIWrapper的processPullResult将消息字节数组解码成消息列表填充msgFoundList,
// 并对消息进行消息过滤( TAG )模式 。 PullResult 类图如图 5-8 所示。
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset =pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() -beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// Step3 :更新 PullResult 的下 一 次拉取偏移量,如果 MsgFoundList 为空, 则 立 即 将
// PullReqeuest 放入到 PullMessageService 的 pullRequestQueue,以便 PullMessageService 能及时唤醒并再次执行消息拉取 。
// 为什么 PullStatus.FOUND,msgFoundList 还会为空呢?因为在 RocketMQ 根据 TAG 消息过滤,
// 在服务端只是验证了 TAG 的 hashcode ,在客户端再次对消息进行过滤 ,
// 故可能会出现 msgFoundList为空的情况。 更多有关消息过滤的知识将在 5.8节重点介绍 。
if (pullResult.getMsgFoundList() ==null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// Step4 :首先将拉取到的消息存入 ProcessQueue ,然后将拉取到的消息提交到 ConsumeMessageService 中供消费者消费,
// 该方法是一个异步方法,也就是 PullCallBack 将消息提交到 ConsumeMessageService 中就会立即返回,
// 至于这些消息如何消费,PullCallBack不关注。
boolean dispatchToConsume =processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// Step5 :将消息提交给消费者线程之后 PullCallBack 将立 即返回,可以说本次消息拉取顺利完成,
// 然后根据 pullInterval 参数,如果 pullInterval>O ,则等待 pullInterval 毫秒后将
// PullRequest 对象放入到 PullMessageService 的 pullRequestQueue中,
// 该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果。
// 再来分析消息拉取异常处理,如何校对拉取偏移量。
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() >0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
}else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
// 如果返回 NO_NEW_MSG (没有新消息)、 NO_MATCHED_MSG (没有匹配消息), 则直
// 接使用服务器端校正的偏移量进行下一次消息的拉取 。 再来看看服务端是如何校正 Offset。
// NO_NEW_MSG ,对应 GetMessageResult.OFFSET_FOUND_NULL 、 GetMessageResult.OFFSET_OVERFLOW_ONE 。
// OFFSET_OVERFLOW_ONE :待拉取 offset 等于消息队列最大的偏移量,如果有新的消息到达,
// 此时会创建一个新的 ConsumeQueue 文件,按照上一个 ConsueQueue 的最大偏移量就是下一个文件的起始偏移量 ,
// 所以如果按照该 offset 第二次拉取消息时能成功 。
//OFFSET_FOUND_NULL : 是根据 Consumequeue 的偏移量没有找到内容,将偏移量定位到下一个 ConsumeQueue ,
//其实就是 offset +(一个 ConsumeQueue 包含多少个条目=MappedFileSize/20)。
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
// 2) OFFSET_ILLEGAL
// 如果拉取结果显示偏移量非法,首先将 ProcessQueue 设置 dropped 为ture ,表示丢弃该消费队列,
// 意味着 ProcessQueue 中拉取的消息将停止消 费,然后根据服务端下一次校
// 对的偏移量尝试更新消息消费进度(内存中),然后尝试持久化消息消费进度,
// 并将该消息队列从 Rebalacnlmpl 的处理队列中移除,意味着暂停该消息队列的消息拉取,
// 等待下一次消息队列 重新负载 。 OFFSET_ILLEGAL 对应服务端 GetMessageResult 状 态 的
// NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY、
// OFFSET_TOO_SMALL中,这些状态服务端偏移量校正基本上使用原 offset,
// 在客户端更新消息消费进度时只有当消息进度比当前消费进度大才会覆盖,保证消息进度的准确性。
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(),false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}",pullRequest);
}catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
},10000);
break;
default:
break;
}
}
}
手写注释的源码包下载地址如下: