RocketMQ的负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

Producer的负载均衡

image.png

如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。

# 创建主题 [root@node1 ~]# mqadmin updateTopic -n localhost:9876 -t tp_demo_02 -w 6 -b 
localhost:10911
DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02"); 
producer.setNamesrvAddr("node1:9876");
producer.start(); Message message = new Message(); 
message.setTopic("tp_demo_02"); 
message.setBody("hello lagou".getBytes()); // 指定MQ 
SendResult result = producer.send(message, new MessageQueue("tp_demo_06", "node1", 5), 1_000 );
System.out.println(result.getSendStatus()); 
producer.shutdown();

Consumer的负载均衡

image.png

如图所示,如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer 超过队列数量,那么多余的Consumer 将不能消费消息 。
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)底层都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。
如果未拉取到消息,则延迟一下又继续拉取。
在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列中去获取消息。
因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个
Consumer。
知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各Consumer。
在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。
Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,一个是MQPullConsumerScheduleService(使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者)

public class MyConsumer { 
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { 
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_pull_grp_01"); 
        consumer.setNamesrvAddr("node1:9876"); 
        consumer.start(); 
        Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("tp_demo_01"); 
        
        for (MessageQueue messageQueue : messageQueues) { 
            // 指定从哪个MQ拉取数据 PullResult result = consumer.pull(messageQueue, "*", 0L, 10); 
            List<MessageExt> msgFoundList = result.getMsgFoundList(); 
            for (MessageExt messageExt : msgFoundList) { 
                System.out.println(messageExt); 
            } 
        }
        consumer.shutdown(); 
    } 
}

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。
负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中。
如下图所示,具体的负载均衡算法有几种,默认用的是AllocateMessageQueueAveragely。
我们可以设置负载均衡的算法:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_push_grp_01"); 
consumer.setNamesrvAddr("node1:9876"); 

// 设置负载均衡算法 
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); 
consumer.setMessageListener(new MessageListenerConcurrently() { 
    
    @Override 
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 
        // todo 处理接收到的消息 return null; 
    } 
});
 
consumer.start();

以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。
可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的MessageQueue数设置为16。
1、Consumer端的心跳包发送
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。
2、Consumer端实现负载均衡的核心类—RebalanceImpl
在Consumer实例的启动流程中启动MQClientInstance实例的部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。


image.png

image.png

通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。


image.png

image.png

这里,rebalanceByTopic()方法根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。
对于集群模式:
case CLUSTERING: { 
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); 
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 
    
    if (null == mqSet) { 
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 
            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 
        } 
    }
    
    if (null == cidAll) { 
        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 
    }
    
    if (mqSet != null && cidAll != null) { 
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); 
        
        // 对MQ进行排序 
        Collections.sort(mqAll); 
        
        // 对消费者ID进行排序 
        Collections.sort(cidAll); 
        
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 
        List<MessageQueue> allocateResult = null; 
        
        try {
            // 计算当前消费者应该分配的MQ集合 
            allocateResult = strategy.allocate( 
                // 当前消费者所属的消费组 
                this.consumerGroup, 
                // 当前消费者ID
                this.mQClientFactory.getClientId(), 
                // MQ集合 
                mqAll, 
                // 消费组中消费者ID集合 cidAll); 
            } catch (Throwable e) { 
                log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); 
                return; 
            }

            Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 
            if (allocateResult != null) { 
                allocateResultSet.addAll(allocateResult); 
            }

            boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 
            
            if (changed) { 
                log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet= {}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); 
                this.messageQueueChanged(topic, mqSet, allocateResultSet); 
            } 
    }
    break; 
}

默认的负载均衡算法:


image.png

image.png

AllocateMessageQueueAveragely是默认的MQ分配对象。
算法:


image.png
// 获取当前消费者在cidAll集合中的下标 
int index = cidAll.indexOf(currentCID); 
// mqAll对cidAll大小取模 
int mod = mqAll.size() % cidAll.size(); 
// 计算每个消费者应该分配到的mq数量 
// 如果mq个数小于等于消费者个数,每个消费者最多分配一个mq 
// 如果mq个数大于消费者个数, 
// 如果mq个数小于等于消费组中消费者个数,平均数就是1,
int averageSize =  mqAll.size() <= cidAll.size() ?  1:  (
// 否则,看mod和index大小,
mod > 0 && index < mod ? 
//如果余数大于0并且当前消费者下标小于余数,则当前消费者应该消费平均数个mq+1
mqAll.size() / cidAll.size() + 1 :
// 如果余数大于0并且当前消费者下标大于等于余数,则当前消费者应该消 费平均数个mq
mqAll.size() / cidAll.size()  
); 

// 计算当前消费者消费mq的起始位置 
int startIndex = (mod > 0 && index < mod) ?index * averageSize :index * averageSize + mod; 

// 计算当前消费者消费mq的跨度,即当前消费者分几个MQ 
int range = Math.min(averageSize, mqAll.size() - startIndex); 

// 分配MQ,放到result集合中返回 
for (int i = 0; i < range; i++) { 
    result.add(mqAll.get((startIndex + i) % mqAll.size())); 
}
return result;

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容