五、RocketMQ-Consumer启动流程

一、概述

一个最简单的Consumer的启动代码如下:

public static void main(String[] args) throws Exception{
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");
            // Specify name server addresses.
            consumer.setNamesrvAddr("10.1.11.155:9876");
            // Subscribe one more more topics to consume.
            consumer.subscribe("qqq", "TagA||TagB");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    msgs.forEach(m-> System.out.println(new String(m.getBody())));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //Launch the consumer instance.
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }

最重要的有几个步骤:

  • new DefaultMQPushConsumer("c1");
  • consumer.subscribe("qqq", "TagA||TagB");
  • registerMessageListener()
  • start()
    下面就这几个方法进行深入

二、实例化一个DefaultMQPushConsumer

通过下面的代码就能够实例化一个DefaultMQPushConsumer

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1");

这个默认构造方法其实调用了下面的构造方法

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

一共三个参数:

  • consumerGroup
    消费者组
  • rpcHook
    远程调用的hook,我看Rocket的德行一般就在RPC调用之前,针对访问控制(ACL)做了一个前置
  • allocateMessageQueueStrategy
    队列负载均衡策略,默认会用 AllocateMessageQueueAveragely(平均算法)
    系统提供了6种策略:后续研究下
    • AllocateMachineRoomNearby
    • AllocateMessageQueueAveragely
    • AllocateMessageQueueAveragelyByCircle
    • AllocateMessageQueueByConfig
    • AllocateMessageQueueByMachineRoom
    • AllocateMessageQueueConsistentHash

三、定义监听的主题及subExpression

代码如下:

consumer.subscribe("qqq", "TagA||TagB");

解析一下topic及subExpression,生成一个SubscriptionData实体(包含了topic、subExpression的value和hash值),并放到map中

四、注册监听处理器 registerMessageListener

consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    // msgs.forEach(m-> System.out.println(new String(m.getBody())));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

新建了MessageListenerConcurrently接口的匿名类,并实现了consumeMessage()方法,注意,这里MessageListenerConcurrently是一个接口,继承自MessageListener,同样继承自MessageListener的还有MessageListenerOrderly(顺序消费监听器)

五、启动 start()

这个是核心的核心了

1、this.checkConfig();

检查group等参数是否合法

2、this.copySubscription();

复制监听规则,其实就是把之前生成的SubscriptionData复制一份,交给RebalancePushImpl(这个应该是负载均衡消费的核心类),除了复制监听规则,还会把以下信息复制过去:

  • setConsumerGroup 复制组名称
  • setMessageModel 复制消费模式:广播/集群
  • setAllocateMessageQueueStrategy 复制负载均衡算法
  • setmQClientFactory(在下面3中初始化) 复制mqClient

3、实例化一个MQClientInstance

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

4、指定offsetStore

如果是集群模式,则会实例化一个LocalFileOffsetStore
如果是广播模式,则会实例化一个RemoteBrokerOffsetStore
都会调用load的方法,但是RemoteBrokerOffsetStore是一个空实现

 switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
      this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
 this.offsetStore.load();

5、实例化ConsumeMessageService

主要有两种:并发消费service及顺序消费service

  • 并发消费 ConsumeMessageOrderlyService.start();
    start() 会启动一个定时任务,延迟15分钟,每15分钟执行一次,清理失效的消息处理线程
  • 顺序消费 ConsumeMessageConcurrentlyService
    start() 会启动一个定时任务,延迟1秒,每20秒执行一次 lockMQPeriodically()方法,这个方法应该是顺序消费的精髓,可以深入研读一下

6、启动客户端 mQClientFactory.start();

producer启动的时候也会调用这个方法
上面1 ~ 5 的步骤,大部分是为了这个start()做准备,这个start做了很多事情,如下:

  • this.mQClientAPIImpl.fetchNameServerAddr();
    如果没有指定namesrv的地址,则会通过RPC获取,这个最终是通过调用一个外部的URL连接来获取的,这个链接可以自己定义,官方也推荐这种方法,最底层也最灵活。
  • this.mQClientAPIImpl.start();
    启动netty客户端,可以和brokernamesrv通讯的通道
  • this.startScheduledTask();
    启动一批自动任务
    • MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
      定期获取namesrv的地址,前面在启动的时候已经获取一次了
    • MQClientInstance.this.updateTopicRouteInfoFromNameServer();
      定期从namesrv拉取meta信息及topic等路由信息
    • cleanOfflineBroker()、sendHeartbeatToAllBrokerWithLock()
      定期检查下线的broker及发送心跳
    • MQClientInstance.this.persistAllConsumerOffset();
      定期固化消费者的offset
    • MQClientInstance.this.adjustThreadPool();
      定期调整处理线程池的大小,这个4.4版本好像是个空实现,没用
  • this.pullMessageService.start();
    启动客户端主动pull的服务
  • this.rebalanceService.start();
    启动负载均衡服务

最后还会调用一次getDefaultMQProducerImpl().start(false),没错,就是producerstart,但是参数是false,不会最终调用producerstart,就把配置刷过去了。

7、其他方法

  • this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  • this.mQClientFactory.checkClientInBroker();
  • this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  • this.mQClientFactory.rebalanceImmediately();
    这几个方法后续研究下

到此为止,consumer的启动流程完毕

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容

  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,589评论 0 10
  • MQ在我们日常开发过程中有着不可替代的作用,不仅可以帮助我们做到信息在系统间的传递,还能进行系统间的解耦合,也就是...
    数齐阅读 3,440评论 2 7
  • 每个人的想法不同 , RocketMQ 介绍的时候就说 是阿里从他们使用的上 解耦出来 近一步简化 便捷的 目...
    楼亭樵客阅读 406评论 0 0
  • consumer 1.启动 有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,Rock...
    veShi文阅读 4,936评论 0 2
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,101评论 1 32