提出疑问
第一次pullMessage时是不是根据offset去获取呢
offset从远程获取到还是存在本地呢
consumer端在启动时会开启负载均衡服务RebalanceService
,一系列调用后会执行RebalanceImpl.doRebalance()
方法,内部调用方法rebalanceByTopic()
,这里主要是为了找到可供消费的队列,如果存在,则获取这个队列可供消费的消息偏移量
// RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 获取nextOffset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 构造请求发送至PullService,不断的拉取消息
this.dispatchPullRequest(pullRequestList);
return changed;
}
可以看到开始的偏移量是从computePullFromWhere(mq)
获取到的。主要看一下PushImpl的内容
// RebalancePushImpl 部分代码
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
return result;
}
可以看到是从offsetStore中获取的。
在DefaultMQPushConsumerImpl的start方法中可知默认的集群模式下使用的是RemoteBrokerOffsetStore。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
去查看RemoteBrokerOffsetStore类代码,type为READ_FROM_STORE
@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
看一下fetchConsumeOffsetFromBroker
方法
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
可以发现是去broker端查询offset。
这里要注意,提到的offset对应的是ConsumeQueue的偏移量,ConsumeQueue中存储的为定长20字节的数据。根据offset找到要消费的消息在CommitLog中的offset和消息长度,再去CommitLog中获取真正的消息。
第一次如果找到消息,broker会返回下次要消费的消息偏移量,在DefaultMQPushConsumerImpl.pullMessage
方法中会将该偏移量设置到原request,下次直接使用该偏移量拉取即可。
// DefaultMQPushConsumerImpl.pullMessage pullCallback部分代码
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());