rocketMQ最重要的组成部分broker节点,他连接着生产者和消费者,并且管理消息的存储。在broker中可以组成集群服务,并且单个节点又可以由一个Master节点和多个Slave节点组成。目前rocketMQ在单节点服务故障时,给出了2种方案。一种是经典的master/slave模式,但是如果master故障,无法自动切换,需要人工介入。另外一种是引入Dledger框架,不仅可以实现消息的复制存储,也可以主动切换master节点,保证节点持续可用。
BrokerController
他是broker服务最重要的管理服务,broker服务连接着 producer客户端,consumer客户端,相同broker节点之间master/slave以及namesrv之间的连接。所以broker服务即是消息生产的存储者,也是消息消费的查询者。在服务之间通信,都有约定的协议,在通信篇也将过,服务端的处理协议都有对应的处理器。
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
*/
AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}
在对于消息的处理,这里分了多种类型,
1.消息生产提供的服务,由SendMessageProcessor 处理器处理,包括SEND_MESSAGE,SEND_MESSAGE_V2,SEND_BATCH_MESSAGE等协议类型
2.消息拉取提供服务,由PullMessageProcessor处理器处理,包括PULL_MESSAGE协议类型
3.消息查询服务,NettyRequestProcessor处理器处理,包括QUERY_MESSAGE,VIEW_MESSAGE_BY_ID等协议类型
4.客户端连接管理服务,ClientManageProcessor处理器,主要处理服务器之间的心跳协议,注销客户端,检测配置等协议
5.消费进度管理服务,事务管理服务和admin处理服务等
在协议处理中,在通信篇中也聊到过,一般是将协议封装成线程对象,然后添加到线程池中的队列中。以SendMessageProcessor 发送消息处理器为例,他又对应的线程池-sendMessageExecutor,在构建线程池时
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
设置了线程处理的初始数量和最大数量,并且也设置了线程队列sendThreadPoolQueue,为什么该线程队列需要在brokerController声明呢?以发送请求处理为例,当生产者大规模的发送生产消息,当broker接受到消息时,将这些请求协议封装到线程中,通过sendMessageExecutor线程池调用线程。但是每个线程处理协议都需要一定的事件,才能结束线程。在生产消息时,需要存储消息,broker为了保证消息的顺序性,在存储中添加了锁,保证线程先后进行存储。这就会出现协议处理不及时,导致客户端长时间等待,那么有什么好的方法,能将没有来的及处理的协议,直接抛弃,然后告知客户端服务器繁忙呢?
所以broker就引入了 BrokerFastFailure 快速失败响应服务
BrokerFastFailure
因为协议都封装成线程放入到了ThreadPoolQueue队列中,如果线程有空闲就会从队列中poll一个线程,然后处理。所以只要监控BrokerController中的线程队列,查看这些线程是否超时就可以了。BrokerFastFailure是启动了定时任务,实时监控这些队列
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}
private void cleanExpiredRequest() {
while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
} else {
break;
}
} catch (Throwable ignored) {
}
}
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
先是开启了10ms的定时任务,执行cleanExpiredRequest方法。由于Runnable存有RequestTask 请求任务的原始信息,通过任务的创建时间与当前时间比较,得出是否需要移除队列,如何通知客户端反馈信息呢。RequestTask内部有个returnResponse方法
public void returnResponse(int code, String remark) {
final RemotingCommand response = RemotingCommand.createResponseCommand(code, remark);
response.setOpaque(request.getOpaque());
this.channel.writeAndFlush(response);
}
此时的channel就是客户端的channel,创建好响应协议并设置opaque值,就能告知客户端。
ProducerManager 生产者连接信息管理
首先在集群生产者中,都会自定义一组生产者。所以同一个生产者组中,由可能有多个生产者。
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
但是也有可能生产者分布注册了不同的生产组中。
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
ClientChannelInfo clientChannelInfoFound = null;
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new HashMap<>();
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
} finally {
this.groupChannelLock.unlock();
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
} else {
log.warn("ProducerManager registerProducer lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
}
注册一个生产者信息,需要的信息很简单。只要提供生产者组名,和客户端的连接信息,然后放入到groupChannelTable map中。生产者像broker注册,都是通过心跳协议完成的。
ConsumerManager 消费组管理
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
每个消费者进行消费时,都有对应的组名。假设不同的组,但是消费同一个队列,他们之间的消费是不冲突的。以消费组下所有的消费者进行消费管理。但是同一个消费服务也可以订阅不同的消费组中。
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
这段代码是注册消费者,其中消费者定义的信息有很多,除了客户端信息外,还有消费者类型ConsumeType,消息模式MessageModel,消费起始位置ConsumeFromWhere,还有订阅数据SubscriptionData。在同一个消费组下,必须规定,这些信息必须是一致的。如果出现2个消费组,对于注册的信息不一致,那么消费是无法控制。当消费存在新增或者订阅信息变更,都会通知这组下的消费者通知消费者变更,消费者就会拉取当前所有生产者,然后做负载。
ConsumerGroupInfo 是一个消费组中管理了消费者连接信息
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
这些信息告知我们,同一个消费组内,ConsumeType,MessageModel ,ConsumeFromWhere 必须一致的,但出现不同也会更新掉
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
updated = true;
}
infoOld = infoNew;
} else {
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
this.groupName,
infoOld.toString(),
infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
为什么在这里说明消费者是如何感知同消费组下,消费者的变动呢?因为消费者在获取消息进行消费的过程中,在订阅模式下,一般一个消费队列由一个消费者进行负责消费。如果有消费者增加或者减少,势必影响消费队列的分配。为了达到负载均衡,客户端通过获取当前消费者信息,进行重新分配。在消费者中会重要讲解
ClientHousekeepingService
broker需要管理很多client channel,包括消费者,生产者和过滤服务等连接。为了监控这些连接是否可用,会进行扫描所有连接,将长时间idle的连接进行关闭。
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
开启了一个定时线程,执行scanExceptionChannel方法。该方法就是调用producerManager,consumerManager,FilterServerManager。扫描对应关联连接是否可用。