这一讲分析的是RocketMQ的消费者,下一讲分析broker。从第三讲开始,主要是对照源码来分析,所以这次的分享不仅仅是讲消费者的逻辑和消费过程,还整理了client端(生产者和消费者都是client)的接口图,启动之后消费端的状态。
读官方文档,读RocketMQ相关的书,还有查阅网上相关的一些博客,别人分析的再系统,再有条理,从再多的维度去分析,看再多变,我总是觉得不是很清晰,理解的不够透彻,因为涉及到的逻辑太多了,过几天就忘了。所以在学习的过程中,必须整理出自己理解,虽不能面面俱到,但因为是自己的理解,就不会忘记。
client接口和类图
client端是依赖remoting模块的,所以把remoting模块的一些接口图也整合进来了。
remoting接口
client接口
主要接口图
接口依赖关系图
从下面这张图中可以清晰的看出,client只用的了remoting接口的RemotingClient。RemotingServer接口在Namesrv和Broker中用到了。
接口总结
RocketMQ的设计中,分了三层接口:
用户接口,实现接口,remoting接口。用户接口是开箱即用的,有默认的实现类,实现类是把用户的配置,业务逻辑,配置文件信息,在启动的时候,初始化进去。核心的业务逻辑在实现接口的实现类里。
实现接口的实现类持有MQClientInstance,MQClientInstance通过MQClientAPIImpl和remoting接口层打交道。MQClientInstance,MQClientAPIImpl和MQAdminImpl都没有接口,MQAdminImpl持有MQClientInstance,访问remoting接口也是通过MQClientAPIImpl类。
RocketMQ消息消费概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。
集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载?消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被同一个消费者消费,一个消费者可以消费多少个消息队列。
消息消费者初探
推模式的消费者MQPushConsume
实现类DefaultMQPushConsumer
DefaultMQPushConsumerImpl分析
- DefaultMQPushConsumerImpl :消息消息者默认实现类,应用程序中直接用该类的实例完成消息的消费,并回调业务方法。
RebalanceImpl :字面上的意思(重新平衡)也就是消费端消费者与消息队列的重新分布,与消息应该分配给哪个消费者消费息息相关。
MQClientInstance: 消息客户端实例,负载与MQ服务器(Broker,Nameserver)交互的网络实现
PullAPIWrapper: pull与Push在RocketMQ中,其实就只有Pull模式,所以Push其实就是用pull封装一下
MessageListenerInner: 消费消费回调类,当消息分配给消费者消费时,执行的业务代码入口
OffsetStore: 消息消费进度保存
ConsumeMessageService: 消息消费逻辑
消费者启动流程
- 构建主题订阅信息SubscriptionData并加入到RebalanceImpl的订阅消息中
- 初始化MQClientInstance、RebalanceImple等
- 初始化消息进度。如果消费时集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息进度存储在消费端。
- 根据是否是顺序消费,创建消费端线程服务
- 向MQClientInstance注册消费者
启动流程图
启动分析
DefaultMQPushConsumer
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
consumer.setNamesrvAddr("localhost:9876");
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
## start方法
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
defaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
关键部分
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this,
(MessageListenerConcurrently) this.getMessageListenerInner());
this.consumeMessageService.start();
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
mQClientFactory.start();
MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
没有消息的时候
有消息的时候
启动后线程
- PullMessageService
- RebalanceService
- 定时任务线程MQClientFactoryScheduledThread和PullMessageServiceScheduledThread
- Netty相关线程,NettyClientPublicExecutor,NettyClientWorkerThread,NettyClientSelector
- 消费者线程ConsumeMessageThread
消息拉取
基于PUSH模式来详细分析消息拉取机制。
从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
PullMessageService实现机制
ProcessQueue实现机制
ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中。PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。
ProcessQueue核心属性
ProcessQueue核心方法
消息拉取基本流程
- 消息拉取客户端消息拉取请求封装
- 消息服务器查找并返回消息
- 消息拉取客户端处理返回的消息