顺序消息
顺序消息是指消息消费的顺序和生产者发送消息的顺序一样的。
例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
分区有序
分区有序是指这个Topic下这个队列下的消息是有顺序的,生产者发送消息的时候,将严格按照消息的顺序,将消息们发送到一个Topic下的一个队列,从而保证了生产者分区消息有序,消费者进行消费时,进行单线程单队列消费,保证了消费有序。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个队列中严格的按照 FIFO 原则进行消息发布和消费的场景。sharding key 比如订单Id,一个订单的创建、付款、完成有序的,根据算法将这个订单的所有事件发送到同一个队列中去。
全局有序
全局有序是指某个Topic下的所有消息都要保证顺序,可以通过一个Topic只有一个消息队列,保证了全局有序,实际上市分区有序的变种。
消息顺序性保证
全局有序是分区有序的一个特列,只需要设置Topic下消息队列的个数为1即可,因此分区有序消息有序,就可以保证顺序消息。
顺序消息保证三个条件:
- 生产者将消息有序的发送到同一个分区队列
- 同一个队列的消息是顺序存储的
- 消费者以这个发送顺序进行消费
消费者顺序消费实现
消息消费是以消费者组为维度的,一个消费者组可以消费这个topic下的所有消息队列,要保证顺序消费,这个topic下的一个消息队列只能由消费者组中的一个消费者消费,然后消费者消费这个消息队列是单线程消费的,这样就保证了顺序消息消费。
topic下的一个消息队列只能由消费者组中的一个消费者消费,这个由Broker端对消息队列加锁来实现。加锁采用了ConcurrentHashMap。ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。
一个消息队列由一个线程消费,由一个消息队列一个锁、消费时synchronized关键字共同维护单线程消息消费的。
顺序消费步骤
并发消息消费的流程包含4个步骤:消息队列负载均衡、消息拉取、消息消费、消息消费进度存储。顺序消费略有不同,每个步骤都有加锁或并发控制。
消息队列负均衡
RebalanceService服务每隔20秒执行一次负载均衡方法,在负载均的过程中,针对顺序消息,lock()方法会向Broker端申请锁定MessageQueue,如果锁定失败,说明messageQueue正在消费者消费,不能被拉取消息,等待下次锁定。
// mqSet,为这次负载均衡之后需要消费的队列
for (MessageQueue mq : mqSet) {
// 新的MessageQueue,新建对应的ProcessQueue
if (!this.processQueueTable.containsKey(mq)) {
// 顺序消息,锁定broker端的MessageQueue消息队列,锁定失败,说明messageQueue正在消费者消费,不能被拉取消息;等待下次锁定
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 清空这个消费队列原来的消费进度
this.removeDirtyOffset(mq);
// 新建MessageQueue对应的消息处理队列ProcessQueue队列
ProcessQueue pq = new ProcessQueue();
// 计算从哪里拉取message
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 一个PullRequest对应一个MessageQueue,一个ProcessQueue
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
Broker端MessageQueue加锁
RebalanceLockManager是处理Broker的MessageQueue加锁的类,加锁采用了ConcurrentHashMap。
ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
topic下的一个消息队列只能由消费者组中的一个消费者消费,一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。
LockEntry判定一个MessageQueue是否被锁定,默认锁定60秒,60秒之后消息队列解锁,下次再去锁定。
负载均衡时会执行MessageQueue锁定方法,默认20秒一次负载均衡定时任务,因此下次再锁定时间间隔为20秒。
// 顺序消息,判断MessageQueue是否被锁定
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
/**
* 顺序消息broker锁定消息队列集合
* @param group
* @param mqs
* @param clientId
* @return
*/
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
// 锁定加入到锁定队列
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
} else {
// 未锁定队列
notLockedMqs.add(mq);
}
}
// 存在未锁定队列,进行队列锁定
if (!notLockedMqs.isEmpty()) {
try {
// 获取线程锁,进行锁定操作
this.lock.lockInterruptibly();
try {
// 新建被锁定组的HashMap
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
// 进行队列锁定
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
"tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
// JVM MQClientInstance 实例锁定,一个JVM实例下,两个消费者,不能属于同一个组,
// 要是消费者组相同,只能是两个JVM实例,构成消费者Cluster。
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
String oldClientId = lockEntry.getClientId();
// 锁定实销,重新锁定
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
lockedMqs.add(mq);
continue;
}
log.warn(
"tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
}
} finally {
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
return lockedMqs;
}
消息拉取
DefaultMQPushConsumerImpl#pullMessage为消息拉取的主要方法,在这里针对顺序消息进行了PullRequest拉取请求锁定:
- ProcessQueue被锁定,第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
- processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
// ProcessQueue被锁定
if (processQueue.isLocked()) {
// 第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
if (!pullRequest.isLockedFirst()) {
// 获取messageQueue的开始消费位置
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
// 设置pullRequest被锁定
pullRequest.setLockedFirst(true);
// 修正offset,从上次broker开始位置消费
pullRequest.setNextOffset(offset);
}
} else {
// processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
消费消费
ConsumeMessageOrderlyService是消息顺序消费的类。
MessageQueueLock messageQueueLock对象消息队列锁容器,严格保证一个消息只有一个线程消费,通过队列锁来实现,一个队列一个锁,获得锁才能进行消息消费。
start()方法每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。
持续消费消息,这个消费是以时间为维度的,每次在broker端锁定一个队列60秒,因此线程消费消息60秒。
public void start() {
// 默认每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。
// 集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,(在负载均衡新建ProcessQueue时,默认locked = false)
// 在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,
// 从而消费加锁成功的消息消费队列。
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 顺序消息,每20秒,在broker端进行一次消费队列锁定
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
ConsumeRequest消费消息请求,实现了Runnable接口,可以被提交到消息消费的线程池中,被并发消费。
这里通过messageQueueLock获取消息队列锁,保证一个消息队列一个线程消费。synchronized保证了消费过程也是单线程的。
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获取消息队列锁,一个线程消费一个消息队列
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 广播模式||processQueue被锁定||processQueue没有失效
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
// 持续消费消息,这个是以时间为消费为维度的,每次锁定线程消费60秒;
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 集群模式&&processQueue未被锁定,尝试加锁,并延迟提交请求,在进行拉取消息
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 集群模式&&processQueue已失效,尝试加锁,并延迟提交请求,在进行拉取消息
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 顺序消息消费处理逻辑,每一个ConsumeRequest消费任务不是以消费消息条数来计算的,
// 而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,
// 默认60s后,本次消费任务结束,由消费组内其他线程继续消费
// 消费时间间隔,每次消费任务最大持续时间,60s;延迟提交请求,在进行拉取消息
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// ConsumeRequest 中包含的消息条数,默认1条
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 取出消息进行消费,并放入ProcessQueue的consumingMsgOrderlyTreeMap临时存储
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
// 还原真实的topic
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
// 有消息需要消费
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
// 消息钩子
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
// 申请消息消费锁,如果消息队列被丢弃,放弃该消息消费队列的消费,
// 然后执行消息消息监听器,调用业务方具体消息监听器执行真正的消息消费处理逻辑,
// 并通知RocketMQ消息消费结果。
// processQueue 上锁
try {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 消息消费
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
// 释放锁
this.processQueue.getLockConsume().unlock();
}
// 日志
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}
// 设置返回状态
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// 执行消息钩子
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
// 消费状态统计
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 返回消费消息结果,是否进行持续消费
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
// 未取到消息结束本次循环
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 尝试加锁,并延迟提交请求,在进行拉取消息
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
消息消费进度存储
进行消费进度的更新,其他和并发消息一样,采用ConcurrentHashMap并发安全容器。