RocketMQ作为一个消息中间件,大部分业务逻辑都在对Message进行传递和处理,我们以Message的生命周期为切入点,来探一探它的运行原理,从RocketMQ的架构图我们可以大致分析出Message的运动轨迹
Message大致轨迹:Producer—>Broker—>Consumer
接下来我们通过RocketMQ源码详细分析整个过程
Producer发送消息至broker
1.调用producer的send方法
DefaultMQProducer producer = new DefaultMQProducer("Produ1cerGro12upName");
···
···
SendResult sendResult = producer.send(msg);
2.DefaultMQProducerImpl的sendDefaultImpl方法
这里才是send方法真正的impl,每个producer都对应一个DefaultMQProducerImpl
的实例
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
3.DefaultMQProducerImpl的tryToFindTopicPublishInfo方法
向Namesrv发出GET_ROUTEINTO_BY_TOPIC
的请求,来获取消息发送所需的broker信息、MessageQueue等。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
4.DefaultMQProducerImpl的selectOneMessageQueue
从路由信息中选择一个MessageQueue供消息传输
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
5.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName())
获取brokerAddr(broker的地址)。
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
}
return null;
}
6.调用NettyRemotingClient发送至broker。
broker接收并存储消息
producer通过netty发送消息至broker后,broker会对消息进行处理
1.SendMessageProcessor.processRequest()
此为broker接收消息的入口
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
···
···
final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
2.SendMessageProcessor.sendMessage()
封装requestHeader成broker内部的消息MessageExtBrokerInner
3.this.brokerController.getMessageStore().putMessage(msgInner)
该方法最终调用CommitLog.putMessage(msg)
,CommitLog
是broker对应的文件系统了。
PutMessageResult result = this.commitLog.putMessage(msg);
4.CommitLog.mappedFileQueue.getLastMappedFile()
调用CommitLog.MapedFileQueue.getLastMapedFile()
获取将要写入消息的文件mapedFile,mapedFile.appendMessage(msg,this.appendMessageCallback)
写入消息。
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
5.AppendMessageCallback
mapedFile.appendMessage
会带有AppendMessageCallback.doAppend(fileFromOffset, byteBuffer,maxBlank,Object msg)
,用回调方法存储msg。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
6.MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(),wroteOffset)
用存储消息的节点ip和端口,加上准备写的偏移量(就是在前面获取的文件中)生成msgId。
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
input.limit(MessageDecoder.MSG_ID_LENGTH);
input.put(addr);
input.putLong(offset);
return UtilAll.bytes2string(input.array());
}
7.topicQueueTable更新
以(topic-queueId)为key从topicQueueTable
取queueOffset
,queueOffset
如果为null则设为0,存入topicQueueTable
。
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
8.MessageSysFlag.getTransactionValue()
调用MessageSysFlag.getTransactionValue(msgInner.getSysFlag())
获取tranType
来判断该消息是否是事务消息,如果是TransactionPreparedType
或者TransactionRollbackType
,则queueOffset=0
,这2种类型的消息是不会被消费的。
9.byteBuffer.put()
调用byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen)写入文件。
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
Consumer拉取消息
1.DefaultMQPushConsumer.start()
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.start();
2.RebalanceService.run()
该方法定时调用RebalanceImpl.doRebalance()
方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic)
。
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
3.RebalanceImpl.updateProcessQueueTableInRebalance()
构造PullRequest,从Broker获取nextOffset,pullRequest.setNextOffset(nextOffset),同时更新本地消费进度记录。
4.RebalancePushImpl.dispatchPullRequest(List)
分配请求
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
5.PullMessageService.executePullRequestImmediately(final PullRequest)
调用PullMessageService.executePullRequestImmediately(final PullRequest)
将请求放入pullRequestQueue
请求队列中去。
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
6.PullMessageService.run()
从pullRequestQueue
队列中取出PullRequest
,调用DefaultMQPushConsumerImpl.pullMessage(pullRequest)
作拉取消息的动作。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
7.构造回调函数PullCallback
对拉取消息结果
PullResult
做处理,具体是,从PullResult
中解码出拉取的消息列表,如果消息的订阅tag不为空且不是classFilter过滤模式,则进行tag过滤,然后把过滤后的消息列表装入PullResult
,取出pullResult
的nextBeginOffset
装入当前的pullRequest
的NextOffset
中,更新统计数据,异步提交ConsumeRequest
进行消息消费,接着提交pullRequest准备做下一次拉取消息的请求。
Broker响应Consumer请求
1.PullMessageProcessor.processRequest()
接收到拉消息的请求,做一些简单的判断,如检查Broker权限,确保订阅组存在,检查topic是否存在,然后去messageStore
里取消息。
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(),
subscriptionData
);
详细说明:DefaultMessageStore根据请求的Topic和queueId获取对应的ConsumerQueue,根据传入的queueOffset从consumerQueue里取出目标buffer,然后以20个字节为单位循环从目标buffer里取,取出偏移量offsetPy(占8个字节),消息长度sizePy(占4个字节),过滤标识tagCode(占8个字节),判断如果订阅信息匹配tagCode,则以offsetPy和sizePy从commitLog中以取出消息体buffer,存入GetMessageResult,然后再进行下一次取,最后返回GetMessageResult。
2.处理GetMessageResult
取出GetMessageResult的NextBeginoffset,minOffset,maxOffet3个属性,设置到responseHeader中,然后把GetMessageResult打包进response后发送到Consumer端。
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());