RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。
Producer的负载均衡
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。
# 创建主题 [root@node1 ~]# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b
localhost:10911
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");
producer.setNamesrvAddr("node1:9876");
producer.start(); Message message = new Message();
message.setTopic("tp_demo_02");
message.setBody("hello lagou".getBytes()); // 指定MQ
SendResult result = producer.send(message, new MessageQueue("tp_demo_06", "node1", 5), 1_000 );
System.out.println(result.getSendStatus());
producer.shutdown();
Consumer的负载均衡
如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。
如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个
Consumer。
知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)
public class MyConsumer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_pull_grp_01");
consumer.setNamesrvAddr("node1:9876");
consumer.start();
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("tp_demo_01");
for (MessageQueue messageQueue : messageQueues) {
// 指定从哪个MQ拉取数据 PullResult result = consumer.pull(messageQueue, "*", 0L, 10);
List<MessageExt> msgFoundList = result.getMsgFoundList();
for (MessageExt messageExt : msgFoundList) {
System.out.println(messageExt);
}
}
consumer.shutdown();
}
}
DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中。
如下图所示,具体的负载均衡算法有几种,默认用的是AllocateMessageQueueAveragely。
我们可以设置负载均衡的算法:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_push_grp_01");
consumer.setNamesrvAddr("node1:9876");
// 设置负载均衡算法
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// todo 处理接收到的消息 return null;
}
});
consumer.start();
以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。
可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的MessageQueue数设置为16。
1、Consumer端的心跳包发送
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
2、Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中启动MQClientInstance实例的部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。
通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。
这里,rebalanceByTopic()方法根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。
对于集群模式:
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet);
// 对MQ进行排序
Collections.sort(mqAll);
// 对消费者ID进行排序
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 计算当前消费者应该分配的MQ集合
allocateResult = strategy.allocate(
// 当前消费者所属的消费组
this.consumerGroup,
// 当前消费者ID
this.mQClientFactory.getClientId(),
// MQ集合
mqAll,
// 消费组中消费者ID集合 cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet= {}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
默认的负载均衡算法:
AllocateMessageQueueAveragely是默认的MQ分配对象。
算法:
// 获取当前消费者在cidAll集合中的下标
int index = cidAll.indexOf(currentCID);
// mqAll对cidAll大小取模
int mod = mqAll.size() % cidAll.size();
// 计算每个消费者应该分配到的mq数量
// 如果mq个数小于等于消费者个数,每个消费者最多分配一个mq
// 如果mq个数大于消费者个数,
// 如果mq个数小于等于消费组中消费者个数,平均数就是1,
int averageSize = mqAll.size() <= cidAll.size() ? 1: (
// 否则,看mod和index大小,
mod > 0 && index < mod ?
//如果余数大于0并且当前消费者下标小于余数,则当前消费者应该消费平均数个mq+1
mqAll.size() / cidAll.size() + 1 :
// 如果余数大于0并且当前消费者下标大于等于余数,则当前消费者应该消 费平均数个mq
mqAll.size() / cidAll.size()
);
// 计算当前消费者消费mq的起始位置
int startIndex = (mod > 0 && index < mod) ?index * averageSize :index * averageSize + mod;
// 计算当前消费者消费mq的跨度,即当前消费者分几个MQ
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 分配MQ,放到result集合中返回
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。