RocketMq ConsumerGroup Tag对消费的影响

官方对consumer的定义如下:
Similar to previously mentioned producer group, consumers of the exactly same role are grouped together and named Consumer Group.
Consumer Group is a great concept with which achieving goals of load-balance and fault-tolerance, in terms of message consuming, is super easy.
Warning: consumer instances of a consumer group must have exactly the same topic subscription(s).
大意是消费者准确按照相同角色来分组,分组的目的是负载均衡和失败转移,并且警告同个分组中的消费者一定要订阅相同的topic。
在代码中看到consumerGroup变量定义注释如下,大意差不多。
Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve load balance. It's required and needs to be globally unique.

通过上面的定义,可以准确知道同一组的consumer一定要订阅相同的topic,那么问题来了,订阅的时候除了topic还有tags,这个tag会有影响吗?没有找到相关资料,我就自己做了测试。

instanceName groupName Topic Tag
A GroupA TopicA TagA
B GroupA TopicA TagB

测试目标:测试在相同的消费组中的消费者,订阅相同的topic时,tag不同会不会影响消费和负载均衡。
测试计划:
1.创建生产者发送100条消息;
2.创建消费者

instanceName groupName Topic Tag
A GroupA TopicA TagA
B GroupA TopicA TagB

3.观察消息消费情况和队列分配情况。

发消息

public class SimpleProducer {
    public static void sendSync() throws Exception {
        ClientConfig clientConfig=new ClientConfig();
        clientConfig.setNamesrvAddr("localhost:9876");
        MQClientInstance clientInstance=MQClientManager.getInstance().getAndCreateMQClientInstance(clientConfig);
        DefaultMQProducer producer = clientInstance.getDefaultMQProducer();
        producer.setProducerGroup("GroupA");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicA", "TagA", ("Hello mq" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.println("send " + i + " , result:" + sendResult.getMsgId());
        }
        producer.shutdown();
    }
}

消费者

public class SimpleConsumer {
    public static void pushConsume(final String instanceName, final String group, final String topic, final String tag) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(topic, tag);
        consumer.setInstanceName(instanceName);
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("[" + instanceName + "," + group + "," + topic + "," + tag + "] consume: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

测试

public static void main(String[] args) throws Exception {
        
        try {
            SimpleProducer.sendSync();
        } catch (Exception e) {
            e.printStackTrace();
        }

        Thread t2 = new Thread() {
            @Override
            public void run() {
                try {
                    SimpleConsumer.pushConsume("A", "GroupA", "TopicA", "TagA");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t2.start();

        Thread t3 = new Thread() {
            @Override
            public void run() {
                try {
                    SimpleConsumer.pushConsume("B", "GroupA", "TopicA", "TagB");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        t3.start();


        t2.join();
        t3.join();
    }

结果:

....
send 97 , result:AC1100013B2118B4AAC2808264CC0061
send 98 , result:AC1100013B2118B4AAC2808264CD0062
send 99 , result:AC1100013B2118B4AAC2808264CE0063
[A,GroupA,TopicA,TagA] consume: Hello mq1
[A,GroupA,TopicA,TagA] consume: Hello mq2
[A,GroupA,TopicA,TagA] consume: Hello mq5
....

查看队列分布情况:
消费者A

./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@A"
#Consumer MQ Detail#
#Topic                            #Broker Name                      #QID  #ProcessQueueInfo   
%RETRY%GroupA                     mo-x                              0     ProcessQueueInfo [commitOffset=0, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=false, lastPullTimestamp=20180625231132580, lastConsumeTimestamp=20180625231129554]
TopicA                            mo-x                              0     ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132552, lastConsumeTimestamp=20180625231129547]
TopicA                            mo-x                              1     ProcessQueueInfo [commitOffset=50, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231130503, droped=true, lastPullTimestamp=20180625231132572, lastConsumeTimestamp=20180625231129549]

消费者B

./bin/mqadmin consumerStatus -n "localhost:9876" -g "GroupA" -i "172.17.0.1@B"
#Consumer MQ Detail#
#Topic                            #Broker Name                      #QID  #ProcessQueueInfo   
TopicA                            mo-x                              2     ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149571]
TopicA                            mo-x                              3     ProcessQueueInfo [commitOffset=25, cachedMsgMinOffset=0, cachedMsgMaxOffset=0, cachedMsgCount=0, cachedMsgSizeInMiB=0, transactionMsgMinOffset=0, transactionMsgMaxOffset=0, transactionMsgCount=0, locked=true, tryUnlockTimes=0, lastLockTimestamp=20180625231150500, droped=false, lastPullTimestamp=20180625231209016, lastConsumeTimestamp=20180625231149565]
mo@mo-x:~/rocket-mq$ ./bin/mqadmin consumerProgress -n localhost:9876 -g "GroupA"
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
#Topic                            #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%GroupA                     mo-x                              0     0                     0                     172.17.0.1           0                     1970-01-01 08:00:00
TopicA                            mo-x                              0     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              1     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              2     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29
TopicA                            mo-x                              3     25                    25                    172.17.0.1           0                     2018-06-25 23:11:29

测试结果总结:
1.生产者发送了100条TagA消息到TopicA
2.消费者A和消费者B都在GroupA中,都订阅TopicA
3.消费者A订阅TagA,消费者B订阅TagB
4.消费者A收到了部分消息
5.消费者A分配到了两个GroupA-TopicA的队列
6.消费者B分配到了两个GroupA-TopicA的队列

总结:
Tag对同组同Topic的消费者有影响,当存在不同Tag的时候,会导致消费混乱,比如TagA的消息被TagB的消费者消费了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容