终于讲到Broker消息处理流程的最后一步了。讲Consumer
的时候说到消费消息分为Pull和Push两种模式,底层其实都是依靠Pull实现的。在Broker这端处理PushConsumer
的Pull请求的时候,如果消息不存在,会hold住请求,知道超时或者有新的消息到达Borker。所以我们先看下Broker是怎么处理Pull请求的。
Pull请求处理过程
Broker
通过PullMessageProcessor
来处理Consumer
的拉取请求,process的代码如下:(方法比较长,一些参数检查的部分就不贴了)
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, {}", request);
//判断Broker当前是否可读
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
//consumerGroup是否已注册
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
//consumerGroup的消费状态是否Enable
if (!subscriptionGroupConfig.isConsumeEnable()) {
response.setCode(ResponseCode.NO_PERMISSION);
...
return response;
}
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
//请求的topic是否存在,并且可读,请求的queueId是否合法
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
...
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
...
return response;
}
if (!PermName.isReadable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
...
return response;
}
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
...
response.setCode(ResponseCode.SYSTEM_ERROR);
...
return response;
}
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
//1、有设置subscribe flag,表示第一次pull或者需要更新filter
if (hasSubscriptionFlag) {
try {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
//2、是否使用了表达式过滤
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);
assert consumerFilterData != null;
}
} catch (Exception e) {
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
}
} else {
//3、没有设置subscribe flag,表示之前已经订阅过了,对比订阅条件是否一致
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
...
}
//4、检查message Model是否一致
if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
...
}
//5、检查cosumer未订阅消息
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
...
}
//6、pull request的version和subscribe时提供的version不匹配
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
...
}
//7、过滤条件的版本不一致
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
...
}
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
...
}
}
}
//8、判断是否支持SQL92表达式过滤
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
...
}
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {//对%RETRY%消息也做过滤
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
//9、从messageStore读取消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
//10、目前只有shutdown或者没有read权限才返回null,其它无论读取是否出错返回值都不会为空
if (getMessageResult != null) {
//11、根据读取到的内容设置response header
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
//12、master读取比较慢的话,默认设置推荐从slave读取,这里的设置可能会13步被覆盖
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE://如果当前broker是slave,并且不支持read,则提示客户端从master读
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
//13、如果slave可读,覆盖之前的设置
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
//14、将MessageStore的返回状态映射成repsonse的状态
switch (getMessageResult.getStatus()) {
case FOUND://成功
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING://消息正在被后台任务删除,建议客户端重新发请求
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {//queue不存在或者offset不存在
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY://要读取的offset太大了
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}
//15、回调用户注册的consumeHook
if (this.hasConsumeMessageHook()) {
...
this.executeConsumeMessageHookBefore(context);
}
//16、处理返回结果
switch (response.getCode()) {
case ResponseCode.SUCCESS:
//16.1 成功后更新统计信息
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());
this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
//16、2 从文件中读取消息set到response body返回
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
//16.3 netty直接读取内存映射文件,少一次copy
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}
response = null;
}
break;
case ResponseCode.PULL_NOT_FOUND:
//16.4 没有读取到消息,则hold住请求,有新消息时唤醒。等待超时后还是没读到brokerAllowSuspend=false
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
//16.5 消费开始的offset不正确,小于最小offset或者大于最大的offset
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);//发一条消息到OFFSET_MOVED_EVENT Topic,监控系统可接收该topic的消息
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
//17 记录消息读取位置
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
首先对一些参数做检查、然后检查请求中带的过滤条件是否一致。这里需要注意的是,对于同一个ConsumerGroup
下的多个consumer客户端,Broker要求订阅参数设置必须要是一致的,要不然会造成数据混乱。这里很好理解,比如ConsumerA
和ConsumerB
属于同一个ConsumerGroup
,采用cluster模式消费,如果两个设置的消息过滤条件不一样,那Broker就不知道以哪个设置为准了。在实际项目中,其实就是属于同一个ConsumerGroup
的消费者,代码是同一套。
- 第9步,从
MessageStore
中读取消息,这里的步骤分为如下几步:1)找到topic+queue对应的ConsumeQueue
;2)根据PullRequest
传入的offset找到MappedFile
;3)从MappedFile
中里面读取指定数量的CQUnit
,根据TagCode做下过滤,然后得到过滤后的Commit log
的offset;4)根据offset从CommitLog中获取消息详情;5)根据消息详情再做一次过滤,然后返回结果。代码就不详细看了,如果看过上一篇ConsumeQueue
生成过程,应该很容易理解 - 第12、13步,如果当前Broker发现读取消息很慢(比如内存不足),可以建议
Consumer
从Slave读取,Consumer
的负载均衡会根据Broker的返回值来重新决定后续从哪个Broker读 - 第16.2、16.3步,如果消息读取成功,从
MessageStore
返回的消息仍然是只返回了指向内存文件的ByteBuffer
,根据配置有两种返回数据的方式,第一种是从ByteBuffer中将数据读取到response中,然后返回。第二种是让netty直接读取
ByteBuffer,将消息写给客户端,相对前一种,不需要将
ByteBuffer`中的数据copy到java Heap中,少一次内存copy。但是第二种方式无法记录监控信息,比如consumer消费延时等指标。 - 第16.4步,如果没有读取到消息,会判断是否是
PushConsumer
(参数hasSuspendFlag
),如果是的话,则将请求挂起,方式就是封装成PullRequest
提交给PullRequestHoldService
。
这里还有另外一个参数brokerAllowSuspend
,这个参数是false的话也不会挂起请求。PullRequest
被hold住不会无限期等下去,而是有个超时时间,如果等待超时后,就会重新提交到这个Processor
,这个时候brokerAllowSuspend
参数会设置成false。所以,这一次无论是否有消息读取到,都不会再hold住了。 - 第16.5步,如果consumer请求中的offset不在正常范围内,会发一条告警消息出来
- 第17步,记录consumer当前消费的offset,便于
Consumer
故障恢复。
下面看下Push模式下PullRequestHoldService
的处理逻辑
PullRequestHoldService
上面的16.4步中,通过suspendPullRequest()
方法提交hold请求,我们从这里开始看下代码:
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
pullRequest被放入一个以topic+queue
为key的Map队列中异步处理。任务被放入等待队列后,有两种情况会被执行:1)定时;2)Broker有新的消息到达。
定时执行
PullRequestHoldService
是一个单线程任务,定时检查队列中的request重新执行。下面是具体的逻辑:
//定时执行的检查操作
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
//1、clone等待的List,同时会清空等待列表,保证线程安全
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
//2、循环执行Request
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
//3、判断等待的时间内有没有新的消息进来
if (newestOffset > request.getPullFromThisOffset()) {
//4、判断消息是否符合过滤条件,对于定时唤醒任务,match=true
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
//5、消息符合request的过滤条件,重新通过PullRequestProcessor执行消息读取
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
//6、如果requst等待超时,无论前一步是否符合条件,肯定会发给processor处理
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
//7、未超时和不符合过滤条件的request,重新放入队列等待
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
以上就是request的执行逻辑,对于已经等待超时和或者符合过滤条件的request,重新交给PullRequestProcessor
处理。
第4步中判断match的逻辑对于定时执行时永远返回true。
主动触发
当ReputMessageService
发现有新的消息时,也会调用notifyMessageArriving()
方法,看当前消息是否符合第4步的条件,如何符合等待的request的过滤条件,会将这些request重新执行。这个可以看下上一篇讲ConsumeQueue
的时候ReputMessageService
的部分。
总结
用了10几篇文章终于把RocketMQ主要的架构和代码流程讲完了,还有顺序消息和事务消息的部分没有涉及,后续会继续补充。