总览
消息消费者在消费一批消息后,需要记录该批消息已经消费完毕,然后也会从ProcessQueue处理队列中移除该批消息,返回ProcessQueue最小偏移量,并存入消息进度表中。
广播模式
同一个消费组的所有消息消费者都需要消费主体下的所有消息,也就是统一组内的消费者的消费行为是相互独立的,互相不影响,故消息进度需要独立存储,与消费者绑定,消费进度存储在消费者本地LocalFileOffsetStore。
集群模式
同一个消费组内的所有消费消费者共享主题下的所有消息,同一条消息在同一时间只会被消费者组内的一个消费者消费,并且消息队列的动态变化重新负载,所以消费进度需要保存在一个每一个消费者都能访问到的地方,因此消息消费进度存储文件存放在消息服务器端Broker。
消费进度存储维度
消息消费进度的存储是以MessageQueue为维度的,因为消费者消费也是以MessageQueue为维度的,一个消费者可以消费topic下的一个MessageQueue,或者两个MessageQueue,而这个MessageQueue不会被两个消费者消费,所以这个MessageQueue的消费进度可以暂时缓存在消费者端;缓存在消费者端,进行了消息消费,更新这个进度实时性高、方便。 所以,在一直进行消息消费的时候,会更新这个缓存的offset;默认5秒定时持久化一次offset到Broker端。只有在发生消息队列发生负载均衡的时候,消费者消费新增了MessageQueue队列,才回去Broker端请求这个MessageQueue的offset;一般在消息实时消费的时候,请求消费者端缓存的offset就可以,它也代表了这个MessageQueue的消费进度。
消费流控
消费者消费ProcessQueue中的消息是以多线程的方式,将这个ConsumeRequest提交到线程池进行消息消费。可以多个ConsumeRequest并发消费。消费进度的更新是使用当前ProcessQueue的最小进度的,比如Task1消费(50,70)的offset之间的数据、Task2消费(90,120)的offset之间的数据,Task3消费(10,30)的offset的数据,如果Task1,Task2消费完毕了,Task3正在进行消费,Task1、Task2并不会去更新消费进度的,因为Task3的10这个offset还没有被消费;Task3如果发生了死锁,导致一直无法被消费,消费进度无法向前推进。为了避免发生这种情况,RocketMQ引入了消息拉取的流控措施:DefaultMQPushConsumer#consumeConcurrentlyMaxSpan=2000,消息处理队列ProcessQueue中最大消息偏移量与最小偏移量不能超过该值,如超过该值,触发流控,将延迟该消息队列的消息拉取。
LocalFileOffsetStore
广播模式采用LocalFileOffsetStore是将消费进度存储在消费者端。
存储消费进度在消费端本地文件。基本方法都是消息进度启动加载、更新消费进度、查询消费进度、持久化消费进度等方法。
存储文件为:${RocketMQ_Home}/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json。
消费进度内存文件结构: ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>()
定时任务持久化消费进度,默认5s一次。
// MQClientInstance定时任务持久化消费进度,默认5s一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
RemoteBrokerOffsetStore
集群模式采用RemoteBrokerOffsetStore将消息进度存储文件存放在消息服务端Broker端。RemoteBrokerOffsetStore和LocalFileOffsetStore非常相似,只有消费进度最终持久化在Broker端,略有不同。
消费进度读取
消费进度读取分为从内存读取、从磁盘读取:
在消息持续消费的时候,从内存读取;
在发生队列负载均衡的时候,从磁盘读取;请求Broker端,读取新队列的消费的offset。
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
// 消息消费进度读取模式
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
消费进度更新
更新Broker端消息消费进度。请求命令为UPDATE_CONSUMER_OFFSET请求,更新ConsumerOffsetManager的offsetTable;Broker端默认10s持久化一次消息进度,存储文件名:${RocketMQ_ HOME}/store/config/consumerOffset.json。
样例
{
"offsetTable":{
"TopicTest@please_rename_unique_group_name_4":{0:139653,1:139653,2:139655,3:139656
}
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}