rocketmq 消费者负载均衡(转载)

原文链接:https://blog.csdn.net/qq_38082304/article/details/112378245

RocketMQ消费负载策略

Consumer在拉取消息之前需要对TopicMessage进行负载操作,负载操作由一个定时器来完成单位,定时间隔默认20s

简单来说就是将Topic下的MessageQueue分配给这些Consumer,至于怎么分,就是通过这些负载策略定义的算法规则来划分。

AllocateMessageQueueAveragely

平均负载策略,RocketMQ默认使用的就是这种方式,如果某个Consumer集群,订阅了某个Topic,Topic下面的这些MessageQueue会被平均分配给集群中的Consumer,为了帮助大家理解,我画了个图


20210107154034.jpg

我来讲下这个图代表的意思,假设topic为 testMsg,该testMsg下面有4个MessageQueue,然后这些Consumer组成了一个集群(都监听了该Topic并且消费groupId是一样的),首先会给Consumer和MessageQueue进行排序,谁是老大,谁先拿MessageQueue, 平均分配分为两种情况,

MessageQueue数量大于Consumer数量

如果队列数量不是消费者数量的整数倍,跟上图中2个Consumer和4个Consumer的情况一样,先分每个Consumer应得的数量,拿2个Consumer来举个例子,C1 和C2 各自分到了2个MessageQueue,C1排序时在C2前面,所以C1先把 Q0 和Q1拿走,C2再拿2个,也就是Q2和Q3。

如果队列数量不是消费者的整数倍,跟上图3个Consumer和5个Consumer的情况一样,5个Comsumer的比较特殊,我们过会再讲,我们拿3个Consumer的情况来举例,4个消息队列,每个Consumer能分到1个,还剩下1个,弱肉强食嘛,剩下的当然给排在前面的大哥C1啦,最后分下来,C1分到2个队列,C2和C3只分到一个,分完数量以后,也是按照顺序来拿,C1拿到了Q0和Q1,然后C2就只能从Q2开始拿,C3只能拿剩下的Q3啦

MessageQueue数量小于Consumer数量

这种情况平均下来,每个人1个Consumer都分不到一个,也就是我们上图中的5个Comsumer的情况老规矩,按照排序顺序,每人先拿1个队列,由于C5排在最后面,队列全被别人拿走了,C5就一直分不到消息队列,除非前面的某个Consumer挂了,20s之后,在队列重新负载的时候就能拿到MessageQueue。

具体算法:

//consumer的排序后的
int index = cidAll.indexOf(currentCID);
//取模
int mod = mqAll.size() % cidAll.size();
//如果队列数小于消费者数量,则将分到队列数设置为1,如果余数大于当前消费者的index,则
//能分到的队列数+1,否则就是平均值
int averageSize =
  mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                                       + 1 : mqAll.size() / cidAll.size());
//consumer获取第一个MessageQueue的索引
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 如果消费者大于队列数,rang会是负数,循环也就不会执行 
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
  result.add(mqAll.get((startIndex + i) % mqAll.size()));
AllocateMessageQueueAveragelyByCircle

环形平均分配,这个和平均分配唯一的区别就是,再分队列的时候,平均队列是将属于自己的MessageQueue全部拿走,而环形平均则是,一人拿一个,拿到的Queue不是连续的。我也画了张图来帮助大家理解


20210107162520.jpg

这种环形平均分配和平均分配,每个Consumer拿到的MessageQueue数量是不变的,我们就拿3个Consumer的情况举个例子,
也是对Consumer和MessageQueue排序,先确定每个Consumer能拿到的MessageQueue数量,C1能分到2个,C2和C3只能分到1个
C1先拿1个,然后C2拿一个,C3拿一个,C1再拿一个。也就是图上3个Consumer画的这个情况。
另外,如果Consumer的数量大于消息队列的数量,处理方式和平均分配时一样的。

//当前consumer排序后的索引 
int index = cidAll.indexOf(currentCID);  
//index会是consumer第一个拿到的消息队列索引
for (int i = index; i < mqAll.size(); i++) {
  //这里采用了取模的方式
  if (i % cidAll.size() == index) { 
    result.add(mqAll.get(i));  
  }
}
AllocateMessageQueueByConfig

用户自定义配置,用户在创建Consumer的时候,可以设置要使用的负载策略,如果我们设置为AllocateMessageQueueByConfig方式时,我们可以自己指定需要监听的MessageQueues,它维护了一个List messageQueueList,我们可以往这里面塞目标的MessageQueues,这个策略了解一下就行,用的不多,具体设置代码如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
//用户自定义queue策略
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
//指定MessageQueue
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("testMsg","broker-a",0)));
//设置consumer的负载策略
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
//启动consumer
consumer.start();
AllocateMessageQueueByMachineRoom

机房负载策略,其实这个策略就是当前Consumer只负载处在指定的机房内的MessageQueue,还有brokerName的命名必须要按要求的格式来设置: 机房名@brokerName

我们先看下具体的使用

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
AllocateMessageQueueByMachineRoom allocateMachineRoom = new AllocateMessageQueueByMachineRoom();
//指定机房名称  machine_room1、machine_room2
allocateMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("machine_room1","machine_room2")));
//设置consumer的负载策略
consumer.setAllocateMessageQueueStrategy(allocateMachineRoom);
//启动consumer
consumer.start();

我们再看下源码

 //当前consumer的下标
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
  return result;
}
//符合机房条件的队列
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //brokerName命名规则   machine_room1@broker-a
  String[] temp = mq.getBrokerName().split("@");
  //判断是否符合指定的机房条件
  if (temp.length == 2 && consumeridcs.contains(temp[0])) {
    premqAll.add(mq);
  }
}
//分配到的队列数
int mod = premqAll.size() / cidAll.size();
//取模
int rem = premqAll.size() % cidAll.size();
//当前分配到的第一个队列索引
int startIndex = mod * currentIndex;
//分配到的最后一个队列索引
int endIndex = startIndex + mod;
//取startIndex到endIndex的队列
for (int i = startIndex; i < endIndex; i++) {
  result.add(mqAll.get(i));
}
//MessageQueue数量和Consumer不是整数倍时  有点像平均分配因为队列下标取到的也是连续的
if (rem > currentIndex) {
  result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}

总结一下源码的意思:

  1. 首先赛选出当前Topic处在指定机房的队列
  2. 赛选出队列后,按照平均负载策略进行具体的分配(算法极其相似)
    哈哈,发现总结的真简洁呀,别慌,来张图给你润润喉


    20210107162520.jpg

其实这个策略就是对MessageQueue进行了过滤,过滤完了以后,后续操作就按照平均负载策略来进行具体负载操作。这个算法和平均负载的算法得到的结果是一样的,我感觉这两个策略应该是两个人写的,不然不会写两套不同的算法来实现一个功能。也不知道是不是我自己太差了,理解不到大佬的思路

AllocateMachineRoomNearby

这个策略我个人感觉是AllocateMessageQueueByMachineRoom的改进版本,因为这个策略的处理方式要比AllocateMessageQueueByMachineRoom更加灵活,还考虑到了那些同机房只有MessageQueue却没有Consumer的情况,下面我们来具体讲这个策略。使用该策略需要自己定义一个类,来区分每个broker处于哪个机房,该策略RocketMQ有个测试单元,我稍微改造了一下,就是把这个类提出来了。

public class MyMachineResolver implements AllocateMachineRoomNearby.MachineRoomResolver {

    /**
     * 判断当前broker处于哪个机房
     * @param messageQueue
     * @return
     */
    @Override
    public String brokerDeployIn(MessageQueue messageQueue) {
        return messageQueue.getBrokerName().split("-")[0];
    }

    /**
     * 判断consumer处于哪个机房
     * @param clientID
     * @return
     */
    @Override
    public String consumerDeployIn(String clientID) {
        return clientID.split("-")[0];
    }
}

我们从代码中,可以看出来需要在设置brokerName和Consumer的Id的时候需要加上机房名称,eg:hz_aliyun_room1-broker-a、hz_aliyun_root1-Client1。我们先看一下在代码里面怎么使用这个同机房分配策略

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅topic
consumer.subscribe("testMsg","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    // do job
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
//用户同机房分配策略  
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely()
                ,new MyMachineResolver()));
//启动consumer
consumer.start();

我们可以看到,我们创建这个同机房分配策略的时候,还加了一个平均分配的策略进去,它本身就是一个策略,为啥还要传另一个策略。(该策略只会讲MessageQueue和Consumer按机房进行分组,分组以后具体的负载,就是通过我们传的另外一个负载策略来分配的)我们到源码里面去看,后面会解释到

//消息队列按机房分组
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
  //机房不为空,将broker放到分组中
  if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    if (mr2Mq.get(brokerMachineRoom) == null) {
      mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    }
    mr2Mq.get(brokerMachineRoom).add(mq);
  } else {
    throw new IllegalArgumentException("Machine room is null for mq " + mq);
  }
}

//consumer按机房分组
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
  //这里调用我们自己定义的类方法,得到broker的机房的名称
  String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
  if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    if (mr2c.get(consumerMachineRoom) == null) {
      mr2c.put(consumerMachineRoom, new ArrayList<String>());
    }
    mr2c.get(consumerMachineRoom).add(cid);
  } else {
    throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
  }
}

//当前consumer分到的所有MessageQueue
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
//1.给当前consumer分当前机房的那些MessageQeueue
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
//得到当前机房的MessageQueue
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
//得到当前机房的Consumer
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
  //得到当前机房所有MessageQueue和Consumers后根据指定的策略再负载
  allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}
//2.如果该MessageQueue的机房 没有同机房的consumer,将这些MessageQueue按配置好的备用策略分配给所有的consumer
for (String machineRoom : mr2Mq.keySet()) {
  if (!mr2c.containsKey(machineRoom)) { 
    //添加分配到的游离态MessageQueue
    allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
  }
}

总结一下源码的意思

  1. 分别给MessageQueue和Consumer按机房分组
  2. 得到当前Consumer所在机房的所有Consumer和MessageQueue
  3. 通过设置的负载策略,再进行具体的负载,得到当前Consumer分到的MessageQueue
    4.如果存在MessageQueue的某个机房中,没有和MessageQueue同机房的Consumer,将这些MessageQueue按配置的负载策略分配给集群中所有的Consumer去负载
  4. 最终该Consumer分到的MessageQueue会包含同机房分配到的和部分游离态分配的
    这里我也画个图来解释一下,


    20210107191911.jpg

先同机房的Consumer和MessageQueue进行负载,这里按照平均负载来分(我们创建机房就近策略使用的是平均负载),然后将游离态的通过设置的负载策略来分。
AllocateMessageQueueConsistentHash
一致性哈希策略,这里我简单介绍一下一致性哈希,这里不好我先给个图,我们再来解释


20210108182537.jpg

一致性哈希有一个哈希环的概念,哈希环由数值 0到2^32-1 组成,不管内容多长的字符,经过哈希计算都能得到一个等长的数字,最后都会落在哈希环上的某个点,哈希环上的点都是虚拟的,比如我们这里使用Consumer的Id来进行哈希计算,得到的这几个是物理的点,然后把得到的点存到TreeMap里面,然后将所有的MessageQueue依次进行同样的哈希计算,得到距离MessageQueue顺时针方向最近的那个Consumer点,这个就是MessageQeueu最终归属的那个Consumer。

//将所有consumer变成节点 到时候经过hash计算 分布在hash环上
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
  cidNodes.add(new ClientNode(cid));
}

final ConsistentHashRouter<ClientNode> router; 
//构建哈希环
if (customHashFunction != null) {
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
  //默认使用MD5进行Hash计算
  router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}

List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
  //对messageQueue进行hash计算,找到顺时针最近的consumer节点
  ClientNode clientNode = router.routeNode(mq.toString());
  //判断是否是当前consumer
  if (clientNode != null && currentCID.equals(clientNode.getKey())) {
    results.add(mq);
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容