序
本文主要研究一下rocketmq的consumeConcurrentlyMaxSpan
consumeConcurrentlyMaxSpan
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
//......
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
public int getConsumeConcurrentlyMaxSpan() {
return consumeConcurrentlyMaxSpan;
}
public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
}
//......
}
- DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000
checkConfig
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
//......
private void checkConfig() throws MQClientException {
Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
//......
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
throw new MQClientException(
"consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
//......
}
//......
}
- checkConfig方法要求defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()大于等于1且小于等于65535
pullMessage
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
/**
* Flow control interval
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
//......
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
//......
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
//......
}
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
//......
}
- pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50
ProcessQueue
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
public class ProcessQueue {
public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private final InternalLogger log = ClientLogger.getLog();
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
//......
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
//......
}
- ProcessQueue的getMaxSpan取的是msgTreeMap.lastKey() - this.msgTreeMap.firstKey()
PullMessageService
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullMessageServiceScheduledThread");
}
});
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
//......
}
- executePullRequestLater方法往scheduledExecutorService调度一个延时任务,该任务执行的是executePullRequestImmediately(pullRequest)方法,该方法往pullRequestQueue队列放入pullRequest;run方法会从pullRequestQueue取pullRequest,然后执行pullMessage方法;pullMessage方法首先通过mQClientFactory.selectConsumer取出consumer,然后执行该consumer的pullMessage方法
小结
DefaultMQPushConsumer定义了consumeConcurrentlyMaxSpan属性,默认值为2000;DefaultMQPushConsumerImpl的pullMessage方法在不是consumeOrderly的时候,会判断processQueue.getMaxSpan()是否大于this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大于则执行executePullRequestLater方法进行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默认值为50