RockerMQ里有个非常重要的数据结构叫ProcessQueue,很多功能,例如消费进度,消费等等功能的底层核心数据保存都是有ProcessQueue提供,下面介绍一下ProcessQueue提供的功能,而整个涉及的流程不会在这展开,在另外的功能分析文章如果涉及才会深入分析
看下代码上的注释:
Queue consumption snapshot
即消息快照的意思,为什么要这样形容呢?主要是因为在消息拉取到的时候,会把消息存放在其中。另外在拉取消息的时候,使用是的PullRequest去请求,其内部结构如下:
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;
}
可以看到,ProcessQueue和一个MessageQueue是对应的,即一个队列会有一个ProcessQueue的数据结构,看下其主要的字段
public class ProcessQueue {
public final static long RebalanceLockMaxLiveTime =
Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
// 用来保存拉取到的消息
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
// 当前保存的消息数,放进来的时候会加,移除的时候会减
private final AtomicLong msgCount = new AtomicLong();
// 消费锁,主要在顺序消费和移除ProcessQueue的时候使用
private final Lock lockConsume = new ReentrantLock();
// 顺序消费的时候使用
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
// 记录了废弃ProcessQueue的时候lockConsume的次数
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
// ProcessQueue中保存的消息里的最大offset,为ConsumeQueue的offset
private volatile long queueOffsetMax = 0L;
// 该数据结构里的消息是否废弃
private volatile boolean dropped = false;
// 上次执行拉取消息的时间
private volatile long lastPullTimestamp = System.currentTimeMillis();
// 上次消费完消息后记录的时间
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
// 上次锁定的时间
private volatile long lastLockTimestamp = System.currentTimeMillis();
// 是否正在消息
private volatile boolean consuming = false;
// 该参数为调整线程池的时候提供了数据参考
private volatile long msgAccCnt = 0;
}
isLockExpired
public boolean isLockExpired() {
boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
return result;
}
顺序消费的时候使用,消费之前会判断一下ProcessQueue锁定时间是否超过阈值(默认30000ms),如果没有超时,代表还是持有锁,具体细节在顺序消费的时候会详细说明.
负载
isPullExpired
public boolean isPullExpired() {
boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
return result;
}
在拉取的时候更新lastPullTimestamp的值,然后在rebalance的时候会去判断ProcessQueue已经超过一定的时间没有去拉取消息,如果是的话,则将ProcessQueue废弃(setDropped(true))且从ProcessQueue和MessageQueue的对应关系中移除该ProcessQueue,代码细节如下:
if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
根据打的日志推测,这个应该是个BUG,在某种情况下,拉取会停止,导致时间没有更新,这时候重建ProcessQueue,具体是什么原因,这点不太清楚
cleanExpiredMsg
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {// 顺序消费不处理
return;
}
// 最多处理16条消息
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
// 存在待处理的消息
// 且offset最小的消息消费时间大于consumeTimeout() * 60 * 1000(默认15分钟)
if (!msgTreeMap.isEmpty()
&& System.currentTimeMillis() -
Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue()))
> pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
} else {
break;
}
} finally {
this.lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
try {
// 将消息发回Broker,等待重试,且延迟级别为3
// 该效果是消费失败重试原理类似
pushConsumer.sendMessageBack(msg, 3);
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// 如果这个时候,ProcessQueue里offset最小的消息还等于上面取到的消息
// 那么就将其移除,有可能在上面取出消息处理的过程中,消息已经被消费,且从ProcessQueue中移除
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
msgTreeMap.remove(msgTreeMap.firstKey());
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {
log.error("send expired msg exception", e);
}
}
}
上面是并发消费模式下,定时清理消费时间超过15分钟的消息的逻辑,在消费者启动的时候,就好开启一个定时任务定时调用该方法
public void start() {
this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
15分钟执行一次
putMessage
public boolean putMessage(final List<MessageExt> msgs) {
// 返回值,顺序消费有用,返回true表示可以消费
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
// 以offset为key,放到treemap中
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {// 正常
validMsgCnt++;
// 更新当前ProcessQueue中消息最大的offset
this.queueOffsetMax = msg.getQueueOffset();
}
}
// 新增消息数量
msgCount.addAndGet(validMsgCnt);
// 如果ProcessQueue有需要处理的消息(从上可知,如果msgs不为空那么msgTreeMap不为空)
// 如果consuming为false,将其设置为true,表示正在消费
// 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为false
if (!msgTreeMap.isEmpty() && !this.consuming) {
// 有消息,且为未消费状态,则顺序消费模式可以消费
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
// property为ConsumeQueue里最大的offset
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {// 当前消息的offset与最大消息的差值,相当于还有多少offset没有消费
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
getMaxSpan
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
}
}
finally {
this.lockTreeMap.readLock().unlock();
}
}
catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
}
return 0;
}
获取当前这批消息中最大最小offset之前的差距,这个方法主要在拉取消息的时候,用来判断当前有多少消息未处理,如果大于某个值(默认2000),则进行流控处理
//DefaultMQPushConsumerImpl.pullMessage
if (!this.consumeOrderly) {
// 大于2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// 将PullRequest存起来一会再执行(PullRequest是用来发起拉取消息请求的参数载体)
//PullTimeDelayMillsWhenFlowControl默认为50
this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenFlowControl);
// 流控次数加一,每1000次则打印日志
if ((flowControlTimes2++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, flowControlTimes2);
}
return;
}
} else {
//....
}
//DefaultMQPushConsumerImpl.executePullRequestLater
private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
}
//PullMessageService.executePullRequestLater
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
// timeDelay毫秒后再执行拉取请求
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
// 将PullRequest放回拉取消息的队列中,这样拉取线程就会取到,马上进行请求
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
}
removeMessage
public long removeMessage(final List<MessageExt> msgs) {
// 返回给外部的值,代表当前消费进度的offset
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
// 遍历消息,将其从TreeMap中移除
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {// 不为空证明移除成功
removedCnt--;// 移除消息数
}
}
// msgCount是ProcessQueue中的消息数量,移除了则需要减去该值,即加上该值的负数
msgCount.addAndGet(removedCnt);
// 如果还有消息存在,则使用当前最小的offset作为消费进度
// 如果已经没有消息了,则使用之前ProcessQueue里最大的offset作为消费进度
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
}
return result;
}
这里的返回值和消费进度有很大的关系,在后面分析消费进度的时候会再深入分析
takeMessags
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
// 从treeMap中获取batchSize条数据,每次都返回offset最小的那条并移除
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
// 放到返回列表和一个临时用的treemapp中
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
// 取到消息了就会开始进行消费,如果没取到,则不需要消费,那么consuming设为false
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
该方法顺序消费模式中使用的,取到该消息后就会调用我们定义的MessageListener进行消费
commit
在顺序消费模式下,调用takeMessages取消息,其内部逻辑中,将treeMap的消息放到一个临时用的treeMap里,然后进行消费,消费完成后需要将这个临时的map清除,则是调用commit方法
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// msgTreeMapTemp是这次消费的消息集合,lastKey代表当前消费的进度
Long offset = this.msgTreeMapTemp.lastKey();
// 消费完成,减去该批次的消息数量
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
// 清除消息
this.msgTreeMapTemp.clear();
if (offset != null) {
// 返回消费进度
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
makeMessageToCosumeAgain
当顺序模式返回SUSPEND_CURRENT_QUEUE_A_MOMENT,那么可能会调用该方法,该方法名称意思为:让消息重新消费。
回顾一下上面说的流程:
- 取消息:从treeMap里取出消息然后放到临时treeMap,等待消费成功
- 消费成功:删除临时treeMap
从上面两部可以猜出,当消费失败的时候,不能无视临时treeMap和treeMap,应该要将临时treeMap的消息放回去,如果不放回去的话,一会重新消费的时候,从treeMap中就取不到原来那批消费失败的数据了,具体逻辑在后面分析
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
// 将这批没消费成功的消息从临时treeMap中移除
// 并放回treeMap,等待下次消费
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
isDropped
返回true代表这个ProcessQueue被废弃了,具体出现的原因大概如下:
- rebalance之后,原来存在的MessageQueue不在本地新分配的MessageQueue之中,则把对应的ProcessQueue废弃。
举个栗子:0123这个4个队列,一开始分配给A消费者,这时候启动一个B消费者后,A消费者分配了01这两个队列,那么原来34队列的ProcessQueue就会设置成true - rebalance的时候,会将未订阅的topic下对应的ProcessQueue设置成true
- 还有就是上面isPullExpired讨论的情况
- 当拉取消息的时候,如果broker返回OFFSET_ILLEGAL,那么这时候将对应的ProcessQueue废弃
- consumer关闭(调用shutdown方法)的时候也会废弃
上面就是ProcessQueue提供的一些功能,有很多上层的功能都依赖他的实现,看别的东西前要先了解ProcessQueue,所以上面对ProcessQueue的功能进行了分析,稍微发散了一下,因为涉及的面比较广,所以相关的细节没有展开,待后面文章遇到了再进行分析
ProcessQueue相关的知识点:
- 消费模式:顺序、并行
- 消费进度管理
- rebalance负载均衡
- 等等....