一、Kafka顺序消息
- Producer端:Kafka的顺序消息是通过partition key,将某类消息(例如同一笔订单的不同状态)写入同一个partition,因此Kafka只能保证消息在同一个partition内有序,无法保证全局有序;
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
- Consumer端:Kafka Java Consumer是单线程的设计(多线程方案需要业务端自己实现),即一个partition只能对应一个消费线程,因此可以保证消息被顺序消费;
二、RocketMQ顺序消息
- Producer端:RMQ顺序消息跟Kafka类似,通过消息路由机制把消息发送到指定的MessageQueue中(参考Kafka/RocketMQ生产者路由对比);
- Consumer端:RMQ Java Consumer是线程池的设计,因此在集群模式下消费的顺序消费,需要通过一系列设计来保证;
1. Consumer 注册 MessageListenerOrderly
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_1");
consumer.setNamesrvAddr("192.168.0.99:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// TODO
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
2. 定时向broker发送锁住当前正在消费的队列集合的消息
2.1 Consumer 启动时根据消息监听器类型创建监听服务
// 根据是否顺序消费,创建消费端消费线程服务;ConsumeMessageService主要负责消息消费,内部维护线程池;
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
}
this.consumeMessageService.start();
2.2 ConsumeMessageOrderlyService.start 启动定时任务(默认频率20s)
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
// 如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,
// 具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
2.3 向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。consumer收到后,设置是否锁住标志位。保证broker中的每个消息队列只对应一个消费端。
public void lockAll() {
// broker -> broker上的 MessageQueue(当前Consumer消费的MessageQueue)
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
// 主要获取 broker 地址
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
// 组装批量锁定请求
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs); // MessageQueue
try {
// 发送请求到Broker,Broker返回锁定的MessageQueue集合
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}
// Broker锁定的集合,在本地加锁,后面拉取消息消费时会用到
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
......
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
3. 消费消息时通过锁实现串行执行
3.1 DefaultMQPushConsumerImpl.pullMessage 拉取消息提交到ConsumeMessageOrderlyService的线程池consumeExecutor中执行
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
3.2 ConsumeMessageOrderlyService.ConsumeRequest.run 消费消息,消费时对消费队列进行加锁,保证同一个消费队列中的多条消息会串行执行;
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获取当前 MessageQueue 的锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// 广播模式 或者 ProcessQueue上锁(lockAll()进行上锁)并且锁没有过期,否则延迟 10ms 再执行
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// ProcessQueue 未上锁
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// ProcessQueue 锁过期
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 消费任务一次运行的最大时间。可以通过-Drocketmq.client.maxTimeConsumeContinuously来设置,默认为60s。
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 消费批次大小,默认为1, 也就是一个一个消费,实际生产环境可以调整大
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 从treeMap里面依次获取对应数量的消息出来,取得时候加读写锁
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
......
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 队列锁---防止顺序消息被重复消费
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 进行消息消费,返回消费结果
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
// 消费锁释放
this.processQueue.getLockConsume().unlock();
}
......
// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
// 当队列没有上锁,那么会走这一块,然后进行上锁,这块最终又会重新执行到上面的代码里面去
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
4. 小结
1.
通过ReblanceImp的lockAll方法,每隔一段时间定时锁住当前消费端正在消费的队列。设置本地队列ProcessQueue的locked属性为true。保证broker中的每个消息队列只对应一个消费端;
2.
消费端也是通过锁,保证每个ProcessQueue只有一个线程消费;
3.
当新增消费者或者减少消费者,消费者数量变更的时候,会触发负载均衡,客户端会重新计算消费的队列,这个时候会把不需要再消费的队列的ProcessQueue上的锁释放掉,同时还是去borker里面对新消费的队列进行上锁,如果上锁失败,那么这个队列的消息是不能消费的,只有上锁成功才能被消费;
-------------over-----------