PullMessageService负责拉取消息,从远端服务器拉取消息后存储到ProcessQueue中,然后调用ConsumeMessageService#submitConsumeRequest 方法进行消费,适应小城池来消费消息,确保消息拉取与消息消费的解耦。
消息消费
ConsumeMessageConcurrentlyService#submitConsumeRequest 负责提交消费请求
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
//获取消费批次
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//小于最大消息数则直接提交
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
} else {
//消息切割提交
for (int total = 0; total < msgs.size(); ) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
//失败后重试
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消息消费只是先提交到线程池中,每一个ConsumeRequest 为一个线程。
ConsumeMessageConcurrentlyService$ConsumeRequest#run 整体流程如下
消费进度管理
- 集群模式:消费进度保存在broker,需要每个消费端都可以访问到
- 广播模式:消息进度存储在本地,只需要本机访问即可。
核心类图
广播模式消息进度存储
org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore
存储位置
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
MQClientInstance#startScheduledTask
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
每隔十秒持久化一次。
集群模式消息进度存储
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
所以mq一个队列同一时刻也只允许一个消费者消费,也是避免这里更新的并发问题了。