启动
- org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
// CREATE_JUST, RUNNING, SHUTDOWN_ALREADY, START_FAILED;
switch(null.$SwitchMap$org$apache$rocketmq$common$ServiceState[this.serviceState.ordinal()]) {
case 1:
this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), Boolean.valueOf(this.defaultMQPushConsumer.isUnitMode())});
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
// 如果是集群消费模式,需要通过所有的 consumer pid 做负载均衡,每个 consumer 负责一部分queue
if(this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
// 根据消费模式确定 offset 怎么存储, 消费模式:BROADCASTING, CLUSTERING
// 如果是广播消费,offset 使用本地文件存储
// 如果是集群消费,offset 存储在 broker 端
if(this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch(null.$SwitchMap$org$apache$rocketmq$common$protocol$heartbeat$MessageModel[this.defaultMQPushConsumer.getMessageModel().ordinal()]) {
case 1:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case 2:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// LocalFileOffsetStore 会从本地文件读到上次消费的offset
// RemoteBrokerOffsetStore.load 是一个空实现
this.offsetStore.load();
if(this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
} else if(this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
}
// 并发消费或者顺序消费
this.consumeMessageService.start();
// 向 mQClientFactory 注册消费者信息,key: group value: consumer, 保证一个实例中的一个 group 只可以有一个 consumer
// 注册成功后启动 mqClient
boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if(!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
} else {
this.mQClientFactory.start();
this.log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
}
default:
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
return;
case 2:
case 3:
case 4:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
}
}
广播模式消费位点初始化
- org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore
public void load() throws MQClientException {
// 以 json(map) 的形式存储在文件中,读出并恢复至 consumer.offsetStore
// offsetTable key: queue value: offset
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if(offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
this.offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
Iterator var2 = offsetSerializeWrapper.getOffsetTable().keySet().iterator();
while(var2.hasNext()) {
MessageQueue mq = (MessageQueue)var2.next();
AtomicLong offset = (AtomicLong)offsetSerializeWrapper.getOffsetTable().get(mq);
log.info("load consumer's offset, {} {} {}", new Object[]{this.groupName, mq, Long.valueOf(offset.get())});
}
}
}
ConsumeService 启动
其实这里跟主流程并没有多大关系,浏览一眼,先跳过。
并发消费
- org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public void start() {
// 启动 scheduled 线程,定时清理过期消息
// 默认 15 分钟 1 次
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
public void run() {
ConsumeMessageConcurrentlyService.this.cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
private void cleanExpireMsg() {
Iterator it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
// 遍历所有处理中的 queue,清理过期消息
while(it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = (Entry)it.next();
ProcessQueue pq = (ProcessQueue)next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}
- org.apache.rocketmq.client.impl.consumer.ProcessQueue
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if(!pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
int loop = this.msgTreeMap.size() < 16?this.msgTreeMap.size():16;
for(int i = 0; i < loop; ++i) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if(this.msgTreeMap.isEmpty() || System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp((Message)this.msgTreeMap.firstEntry().getValue())) <= pushConsumer.getConsumeTimeout() * 60L * 1000L) {
break;
}
// 从 msgid 最小的开始取出
msg = (MessageExt)this.msgTreeMap.firstEntry().getValue();
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException var24) {
this.log.error("getExpiredMsg exception", var24);
}
try {
// 发回到重试队列
pushConsumer.sendMessageBack(msg, 3);
this.log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", new Object[]{msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), Integer.valueOf(msg.getQueueId()), Long.valueOf(msg.getQueueOffset())});
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// 从本地过期消息中删除,如果删除失败,下次清理还是会处理这个消息,所以这里也可能产生重复投递
if(!this.msgTreeMap.isEmpty() && msg.getQueueOffset() == ((Long)this.msgTreeMap.firstKey()).longValue()) {
try {
this.removeMessage(Collections.singletonList(msg));
} catch (Exception var19) {
this.log.error("send expired msg exception", var19);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException var21) {
this.log.error("getExpiredMsg exception", var21);
}
} catch (Exception var22) {
this.log.error("send expired msg exception", var22);
}
}
}
}
顺序消费
public void start() {
if(MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000L, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
MQClient 启动
- org.apache.rocketmq.client.impl.factory.MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 获取 namesrv 地址,如果没有配置,则通过暴露的 http 接口获取
// http 地址通过 rocketmq.namesrv.domain 和 rocketmq.namesrv.domain.subgroup 配置
// this.clientConfig 就是 DefaultMQPushConsumer, DefaultMQPushConsumer extends ClientConfig
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 启动 mqClient, 就是启动一个 netty client
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 从 nameserver 拉取最新的 topic 路由信息, 包括 broker,producer,consumer,默认 30s 一次
// 清理离线 brokers 并向所有 brokers 发送心跳, 默认 30s 一次
// 存储消费位点,默认 5s 一次
// 根据队列里acc消息数, 调整消费线程池的 coreSize, 1m 一次,不可修改
this.startScheduledTask();
// Start pull service
// 启动拉取消息线程,从 pullRequest 无界阻塞队列获取拉取请求
this.pullMessageService.start();
// Start rebalance service
// 启动负载均衡,根据 consumer 总数和 queue 总数平分, queue 会进行排序,如果消费者数量跟 broker 数量一致则正好一个消费者负责一个 broker
// 通过 rocketmq.client.rebalance.waitInterval 设置重新均衡间隔,默认 20s
this.rebalanceService.start();
// Start push service
// 启动一个内部 producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
到这里已经启动完毕。可是留下了两大疑问:PullRequestQueue 是谁放进去的,又是从何时开始进行消费的???
往回找,可以判断消费是在 consumeService 里完成的,看一下他的构造
- org.apache.rocketmq.client.impl.consumer
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
// 初始化消费线程池,设置的最小消费线程数为 corePoolSize, 最大消费线程数为 maxPoolSize, 任务队列为 无界 ComsumeRequest 队列
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
// 这个方法里会往消费线程池里提交任务
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 小于批量消费数量直接提交消费请求,否则只提交批量消费数量的任务,提交任务失败则过 5s 再重试提交
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
找 submitConsumeRequest 的方法调用,发现在 DefaultMQPushConsumerImpl.pullMessage() 里。这里是 PullService 从 PullRequest 队列拿到拉取消息请求后调用的。现在 pull 和 consume 串起来了。
接下来去找 PullRequest 是在哪里产生的
- org.apache.rocketmq.client.impl.consumer.PullMessageService
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
找调用
做完 queue 的负载均衡后会立即进行一次 dispatchPullRequest
- org.apache.rocketmq.client.impl.consumer.RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 寻找消费位点,发起 PullRequest
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
确认消费位点
- org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
// 如果这个 consume group 不是刚启动,就从 offerStore 取上次消费位点
// 如果是刚启动, 就从 broker 拉取最后消费位点, 但如果是重试队列,直接从开头消费
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
// 从头消费比较简单,刚启动就从头消费,否则从上次消费位置拉取
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
case CONSUME_FROM_TIMESTAMP: {
// 跟 LAST_OFFSET 差不多,刚启动时用时间戳去 broker 取位点,区别是重试队列不从头,而是从 broker 记录的最后消费位点
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
拉取消息的流程
拉取
- org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
...
前面会做一些状态检查,代码太多,这部分不贴了
...
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// ProcessQueue 是一个本地消息存储,先把消息从 broker 拉取到本地
// 如果数量超过阈值(默认1000)或者占用内存大小超过阈值(默认100MB)就延迟处理这个 PullRequest
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 如果缓存的消息被堵住了,导致消费不动,一样会触发流控,延迟处理 PullRequest
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
// 如果是因为刚启动是把 queue 给锁住了,就重新计算消费位点,否则说明有别的线程在操作这个 queue,那就等会再 pull
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
// 获取订阅信息,知道该从哪个 topic 去拉消息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
...
注册回调,单独拿出来看
...
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
// 拉消息
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
拉取回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
...
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 扔给 consumer 消费
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
...
break;
case NO_NEW_MSG:
// 没有新的消息,从 pullResult 修正一下位点,重新发送这个 PullRequest
break;
case NO_MATCHED_MSG:
// 没有匹配的消息,从 pullResult 修正一下位点,重新发送这个 PullRequest
break;
case OFFSET_ILLEGAL:
// 位点非法,标记 queue 为 dropper,以后这个 queue 的 PullRequest 会直接丢弃
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
// 10s 后修正位点并移除 queue
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
消费
- org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public List<MessageExt> getMsgs() {
return msgs;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
@Override
public void run() {
...
// 这个 listener 就是我们自己写的代码了,通过 consumer.registerMessageListener 设置的消费逻辑代码
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
...
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
...
// 处理消费结果
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
}
处理消费结果
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
// 消费成功,ackIndex = Integer.MAX_VALUE
// 消费失败,ackIndex = -1
// 使用 ackIndex 控制消息的回发
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
...
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
// 广播模式消费失败的消息直接扔了
// 集群模式会发回到 broker
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 往 broker 发送失败的消息,5s 后直接重试消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 更新消费位点
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}