总览
消息消费的模式有两种推模式、拉模式;拉模式已经不再推荐使用,因此本章将详细介绍推模式。
推模式
消息到达消息服务器后,服务器推送消息给消费者。 RocketMQ消息推模式并不是真正意义上的服务端将消息推送给消费端。本质实现为消费端消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息,再提交到消息消费线程池,然后由消费端消费线程异步的从消费线程池获取消息进行消息消费,解耦了消息拉取和消息消费过程。如果未拉取到消息,则延迟一下又继续拉取。
负载均衡
负载均衡为消息队列和消费者的负载均衡,是在客户端实现的。
一个消费组内可以包含多个消费者,一个消费者组订阅相同的消息主题,一个主题Topic拥有多个消息存储队列。每一个组内的消费者可以消费这个Topic下的多个消息队列,Topic下的一个消息队列只能被一个同一个组的一个消费者消费。
负载均衡算法
- 平均分配策略(默认)(AllocateMessageQueueAveragely)
- 环形分配策略(AllocateMessageQueueAveragelyByCircle)
- 机房分配策略(AllocateMessageQueueByMachineRoom)
- 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
- 根据配置分配策略(AllocateMessageQueueByConfig)
AllocateMessageQueueStrategy为负载均衡策略接口,allocate()方法为分配方式具体实现。以上5中具体算法都实现了该接口,重写了allocate()具体的分配策略。
负载均衡流程
MQClientInstance的start()方法中的RebalanceService.start()方法开启了负载均衡服务线程。 此线程每隔20秒执行一次负载均衡方法,根据每个Topic进行消息队列和消费者的负载均衡。其中updateProcessQueueTableInRebalance()根据consumer分配到topic下的MessageQueue和当前consumer正在处理的processQueue进行对比,进行processQueue队列更新,后面会有更详细的介绍。负载均衡之后如果有消费者消费的队列发生了变化,messageQueueChanged()方法更新限流信息,向broker发送心跳,更新订阅数据信息。
/**
* 根据 topic 下消息队列,负载均衡 consumer 组下的 consumer
* @param topic
* @param isOrder 是否严格顺序消息
*/
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
// 广播模式
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
// 集群模式
case CLUSTERING: {
// 从主题订阅信息表中获取主题的队列信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// topic下,一个消费者组的所有消费者
//
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 对消息队列排序
Collections.sort(mqAll);
// 对消费客户端排序
Collections.sort(cidAll);
// 负载均衡策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// topic 下的消费队列,消费者组,负载均衡策略结果
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 根据 consumer 分配到 topic 下的 MessageQueue, 和当前 consumer 正在处理的 processQueue 进行对比,进行 processQueue 队列更新。
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// 如果发生改变更
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
// topic下对应的消费的messagequeue发生改变
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
ProcessQueue
ProcessQueue是MessageQueue在消费端的重现,消息存储的快照。
PullMessageService从消息服务器默认每条拉取32条消息,按消息的偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者线程池,消息成功消费后从ProcessQueue中移除。
PullRequest
PullRequest是拉取消息请求,是针对消息队列来说的,并不是针对一个offset为粒度的。
一个PullRequest对应一个MessageQueue,一个ProcessQueue,在消息拉取的过程中一个MessageQueue队列只有这一个PullRequest,拉取消息会反复重用这个PullRequest,只是里面的nextOffset会被更新。
public class PullRequest {
// 消费者组
private String consumerGroup;
// 待拉取消费队列
private MessageQueue messageQueue;
// 消息处理队列,从Broker拉取到的消息先存入ProccessQueue,然后再提交到消费者消费线程池消费
private ProcessQueue processQueue;
// 待拉取的MessageQueue偏移量
private long nextOffset;
// 是否第一次被锁定
private boolean lockedFirst = false;
}
现在详细介绍updateProcessQueueTableInRebalance()方法,根据consumer分配到topic下的MessageQueue和当前consumer正在处理的processQueue进行对比,进行processQueue队列更新。mqSet为这次负载均衡之后需要消费的队列,发现新加入需要消费的MessageQueue,新建对应的ProcessQueue;然后再新建PullRequest对象,再放入pullRequestList集合中,dispatchPullRequest()方法将pullRequestList集合中的拉取消息的请求对象PullRequest,放入到阻塞队列pullRequestQueue,等待PullMessageService服务线程进行拉取请求处理。pullRequestQueue是拉取消息请求阻塞队列,存放拉取消息的请求,解耦负载均衡创建的拉取请求对象和PullMessageService的消息拉取。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// mqSet,为这次负载均衡之后需要消费的队列
for (MessageQueue mq : mqSet) {
// 新的MessageQueue,新建对应的ProcessQueue
if (!this.processQueueTable.containsKey(mq)) {
// 顺序消息,锁定broker端的MessageQueue消息队列,锁定失败,说明messageQueue正在消费者消费,不能被拉取消息;等待下次锁定
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 清空这个消费队列原来的消费进度
this.removeDirtyOffset(mq);
// 新建MessageQueue对应的消息处理队列ProcessQueue队列
ProcessQueue pq = new ProcessQueue();
// 计算从哪里拉取message
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 一个PullRequest对应一个MessageQueue,一个ProcessQueue
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 处理这一批新建的ProcessQueue队列,
this.dispatchPullRequest(pullRequestList);
return changed;
}
// 将拉取消息请求放入队列
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 放入拉取消息请求阻塞队列,等待请求被处理
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
消息拉取
PullMessageService是消息拉取服务,成员变量LinkedBlockingQueue<PullRequest> pullRequestQueue是拉取消息请求阻塞队列,存放拉取消息的请求,解耦负载均衡创建的拉取请求对象和PullMessageService的消息拉取。
拉取线程从pullRequestQueue中获取一个PullRequest消息拉取任务,如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入队列。获取PullRequest对象之后,进行消息拉取。
public void run() {
log.info(this.getServiceName() + " service started");
// while (!this.isStopped()),这是一种通用的设计技巧,stopped声明为volatile,每执行一次业务逻辑检测一下运行状态,
// 可以通过其他线程将stopped设置为true,从而停止该线程
while (!this.isStopped()) {
try {
// 从pullRequestQueue中获取一个PullRequest消息拉取任务,如果pullRequestQueue为空,则线程将阻塞,直到有拉取任务被放入队列。
PullRequest pullRequest = this.pullRequestQueue.take();
// 拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
DefaultMQPushConsumerImpl的pullMessage()拉取消息方法,主要步骤:
- 封装拉取消息请求
- 消息服务器查找并返回消息
- 消息拉取客户端处理返回的消息
使用CallBack的方式来处理从服务端拉取消息成功或者失败。拉取过程抛出异常,等待一段时间,再将pullRequest放入pullRequestQueue中,等待pullRequest被处理。拉取成功后进行拉取消息结果处理。
- 获取消息成功:
1.1 更新PullRequest的下次拉取偏移量,如果msgFoundList为空,则立即将PullRequest放入到PullMessageService的pullRequestQueue,以便PullMessageService能及时唤醒并再次执行消息拉取。
1.2 首先将拉取到的消息存入ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中的线程池,供消费者消费,该方法是一个异步方法,也就是PullCallBack将消息提交到ConsumeMessageService中就会立即返回,至于这些消息如何消费,PullCallBack不关注。
1.3 等待pullInterval时间后将pullRequest放入pullRequestQueue中,等待pullRequest被PullMessageService进行新的消息请求拉取,否则立即将pullRequest放入pullRequestQueue中,等待pullRequest被PullMessageService进行新的消息请求拉取。达到持续消息拉取,实现准时拉取消息的效果。这里就是实现消息被马不停蹄拉的地方。 - 获取消息失败:
如果返回NO_NEW_MSG(没有新消息)、NO_MATCHED_MSG(没有匹配消息),则直接使用服务器端校正的偏移量进行下一次消息的拉取, 再来看看服务端是如何校正offset的。
// 1. 封装拉取消息请求 2. 消息服务器查找并返回消息 3. 消息拉取客户端处理返回的消息
public void pullMessage(final PullRequest pullRequest) {
// 从pullRequest中获取ProcessQueue,如果处理队列当前状态未被丢弃,则更新ProcessQueue的lastPullTimestamp为当前时间戳;
// 如果当前消费者被挂起,则将拉取任务延迟1s再次放入到PullMessageService的拉取任务队列中,结束本次拉取。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
// 进行消息拉取流控。从消息消费数量与消费间隔两个维度进行控制
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 消息处理总数,如果ProcessQueue当前处理的消息条数超过了pullThresholdForQueue=1000将触发流控,放弃本次拉取任务,
// 并且该队列的下次拉取任务将在50毫秒后才加入到拉取任务队列,并打印log
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// ProcessQueue中队列最大偏移量与最小偏移量的间距,不能超过consumeConcurrentlyMaxSpan,否则触发流控,将延迟该消息队列的消息拉取。
// 这里主要的考量是担心一条消息堵塞,消息进度无法向前推进,可能造成大量重复消费
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 不是严格顺序
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
// 严格顺序
} else {
// ProcessQueue被锁定
if (processQueue.isLocked()) {
// 第一次拉取消息,pullRequest未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
if (!pullRequest.isLockedFirst()) {
// 获取messageQueue的开始消费位置
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
// 修正offset,从上次broker开始位置消费
pullRequest.setNextOffset(offset);
}
} else {
// processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
// 拉取该主题订阅消息,如果为空,结束本次消息拉取,关于该队列的下一次拉取任务延迟3s。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 调用pullAPIWrapper的processPullResult将消息字节数组解码成消息列表填充msgFoundList,并对消息进行消息过滤(TAG)
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
// 消息被找到
case FOUND:
// 更新PullRequest的下次拉取偏移量,如果msgFoundList为空,则立即将PullRequest放入到PullMessageService的pullRequestQueue,
// 以便PullMessageService能及时唤醒并再次执行消息拉取。
// 为什么PullStatus.FOUND,msgFoundList还会为空呢?
// 因为在RocketMQ根据TAG消息过滤,在服务端只是验证了TAG的hashCode,在客户端再次对消息过滤,故可能出现msgFoundList为空的情况。
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 放入pullRequestQueue拉取消息队列,等待pullRequest再次被处理
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 将从broker中获取的消息放入processQueue中,立即返回,然后等待ConsumeMessageService的消费线程去消费processQueue中的消息
// 首先将拉取到的消息存入ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中供消费者消费,该方法是一个异步方法,也就是PullCallBack将消息提交到ConsumeMessageService中就会立即返回,
// 至于这些消息如何消费,PUllCallBack不关注。
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 提交消息消费请求到消息消费线程池中去,唤醒消费者消费消息
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 等待pullInterval时间后将pullRequest放入pullRequestQueue中,等待pullRequest被PullMessageService进行新的消息请求拉取,否则立即将pullRequest放入pullRequestQueue中,等待pullRequest被PullMessageService进行新的消息请求拉取。达到持续消息拉取,实现准时拉取消息的效果。这里就是实现消息被马不停蹄拉的地方。
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
// 如果返回NO_NEW_MSG(没有新消息)、NO_MATCHED_MSG(没有匹配消息),则直接使用服务器端校正的偏移量进行下一次消息的拉取, 再来看看服务端是如何校正offset的。
// NO_NEW_MSG对应,对应GetMessageResult.OFFSET_FOUND_NULL GetMessageResult.OFFSET_OVERFLOW_ONE;
// OFFSET_OVERFLOW_ONE:待拉取offset等于消息队列最大的偏移量,如果有新的消息到达,此时会创建一个新的ConsumeQueue文件,按照上一个ConsumeQueue的最大偏移量就是下一个文件的起始偏移量,所以如果按照offset第二次拉取消息时能成功。
// OFFSET_FOUND_NULL:根据ConsumeQueue的偏移量没有找到内容,将偏移量定位到下一个ConsumeQueue,其实就是offset+(一个ConsumeQueue包含多少个条目=MappedFileSize/20)
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
// 更新拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 矫正OffsetStore消费进度存储
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 立即将pullRequest放入pullRequestQueue中,等待pullRequest被处理。
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
// 如果拉取结果显示偏移量非法,首先将ProcessQueue设置dropped为ture表示丢弃该消费队列,意味着ProcessQueue中拉取的消息将止消费,
// 然后根据服务端下一次校对的偏移量尝试更新消息消费进度( 内存中 ),然后尝试持久化消息消费进度,并将该消息队列从RebalanceImpl的处理队列中移除,
// 意味着暂停该消息队列的消息拉取,等待下一次消息队列重新负载。OFFSET_ILLEGAL对应服务端的GetMessageResult状态的NO_MATCHED_LOGIC_QUEUE、NO_MESSAGE_IN_QUEUE、OFFSET_OVERFLOW_BADLY、OFFSET_TOO_SMALL
// 这些状态服务端偏移量校正基本上使用原offset,在客户端更新消息消费进度时只有当消息进度比当前消费进度大才会覆盖,保证消息进度的准确性。
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新消费者消费进度
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化消息消费进度
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
// 将消息队列从消费者组中移除
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
// 异常等待一段时间,再将将pullRequest放入pullRequestQueue中,等待pullRequest被处理。
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
// 构建消息拉取系统标记
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 读取消息消费进度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 调用pullKernelImpl()方法后与服务端交互
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
// 报错等待一段时间,再进行消息拉取
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
总结
到这里讲解消息队列和消费者负载均衡,还有消息的拉取;LinkedBlockingQueue<PullRequest> pullRequestQueue是一个巧妙的设计,解耦负载均衡创建的拉取请求对象和PullMessageService的消息拉取。下一节将要详细介绍消息的消费。