RocketMQ Rebalance流程分析

 这节介绍Rebalance流程。在介绍Consumer消费消息流程前,先介绍Rebalance得流程,该过程涉及到Consumer的启动。

 之前介绍过,Topic是一个逻辑概念,Topic下可以划分多个Queue以增加Consumer消费的并行度。在一个Consumer Group内,Queue和Consumer之间的对应关系是一对多的关系:一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue,如下图

file

而Rebalance是一种协议,规定了一个 Consumer Group 下的所有 consumer如何达成一致来分配Queue。当Consumer订阅的Topic发生变化,或者Consumer Group内的Consumer实例发送变化时便会触发Rebalance以重新分配每个实例对应的Queue。

1.RebalanceService

 前面介绍客户端启动流程时提到,MQClientInstance在start方法中会启动一系列后台任务,其中就包括Rebalance任务,主要调用了RebalanceService的start方法。RebalanceService继承自ServiceThread,start方法会启动一个后台线程,确保每隔一段时间(默认20秒)会调用一次MQClientInstance的doRebalance方法。如下图:

file

MQClientInstance.doRebalance会调用MQConsumerInner.doRebalance。MQConsumerInner是DefaultMQPullConsumerImpl和DefaultMQPushConsumerImp的父接口,如下,即MQClientInstance将doRebalance方法交给了Consumer实例处理。

file

接着Consumer实例会调用内部RebalnaceImpl的doRebalance方法完成真正的,动作。

 这里提一点,RebalanceService被MQClientInstane持有,一个MQClientInstance只有一个Rebalance实例,之前在讲客户端启动时提到,MQClientInstance由MQClientManager管理,跟本机ip,进程pid有关。RebalanceImpl跟Consumer实例相关,一个Consumer实例对应一个RebalanceImpl对象。

2.RebalanceImpl

 先介绍该类的基本情况

1.属性

protected final ConcurrentMapConcurrentMap</*Queue*/MessageQueue, /*Queue消费进度镜像*/ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);

//DefaultMQXxxxConsumerImpl updateTopicSubscribeInfo时添加
protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    
//DefaultMQXxxxConsumerImpl subscript时会添加
protected final ConcurrentMap<String /* topic */, /*消息的过滤条件*/SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();

protected String consumerGroup;//Consumer实例所在的ConsumerGroup
    
protected MessageModel messageModel;//消息消费模式
    
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;//Queue分配策略,默认为AllocateMessageQueueAveragely

2.继承关系

file

 接第一Part的内容,会调用该类的doRebalce方法,主要逻辑如下:

file

(1) 轮训该实例订阅的所有topic,通过遍历subscriptionInner的值来获取topic信息,该属性内容会在客户端实例调用subscript时增加

(2) 根据topic调用rebalanceByTopic执行rebalance

(3)如果是广播模式,则从topicSubscribeInfoTable获取该topic下的所有Queue,用于后续更新。即广播模式,每个客户端都能收到topic下的所有q,为客户端分配的Queue集合为全量的集合。

(4)如果是集群模式,会获取topic下的所有Queue;从broker获取该topic下所有客户端id列表;排序后调用AllocateMessageAueueStrateg获得ConsumerGroup下该客户端应该分配到的Queue集合。即集群模式,每个客户端分到的q列表由AllocateMessageQueueStrategy来分配。

(5) 获取该客户端所属的Queue集合后,调用updateProcessQueueTableInRebalance更新。

(6) 执行完后,如果有发生变化,则调用messageQueueChanged交给子类具体处理。

(7) 调用truncateMessageQueueNotMyTopic移除缓存中不是该实例处理的Queue。

2.1. 从broker查找该topic对应的客户端id列表

 从MQClientInstance的缓存中获取该topic对应的broker地址,然后调用Netty直接访问broker获取结果

2.2. 分配客户端实例所属Queue集合

 前面说过,广播模式每个客户端实例分配到全量的Queue集合,这里主要介绍集群模式下AllocateMessageQueueStrategy的处理情况。默认为平均分配,实现类为AllocateMessageQueueAveragely。

 先看AllocateMessageQueueStrategy的定义

List<MessageQueue> allocate(
        final String consumerGroup,//当前Consumer实例所属ConsumerGroup
        final String currentCID,//当前客户端应用ID
        final List<MessageQueue> mqAll,//待分配的Queue列表
        final List<String> cidAll //该topic,该ConsumerGroup下的所有客户端应用ID列表
    );

该方法会选出currentCID所属的Queue列表。AllocateMessageQueueAveragely则是按照currentCID所属的位置进行平均分配,过程如下:

file

上面在源码里加上了对应的注释。前面提到传进来的mqAll和cidAll都是排过序的,该过程就是按照客户端序号,从所有Queue列表中平均分配自己所属的Queue,涉及到的各种可能如下:

file

大概过程就是:能整除,则平均分;不能整除,则cid在mod数内的则多分1个Queue,在mod数外的则少分一个。

 前面说过,该分配策略执行时,会将topic下的Queue列表和客户端进行排序,在分配时便会导致排在前面的客户端能分到Queue,且分的Queue会多点。考虑一种情况,如果一个ConsumerGroup订阅了2个topic,Topic_X和Topic_Y,每个topic都有2个Queue,同时该ConsumerGroup下有4个客户端实例,因为Rebalance是根据topic来的,所以不会出现4个Queue被平均消费的情况,结果如下如下:

file

因而在初始化时,最好保证ConsumerGroup下的客户端数量<=Topic下的Queue数量。

2.3. updateProcessQueueTableInRebalance

 该方法的定义如下:

    /**
     * @param topic topic
     * @param mqSet Rebalance后该客户端实例,该topic下的所有现有分配到的q集合
     * @param isOrder 是否为顺序消费
     * @return
     */
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {}

前面提到,RebalanceImpl只有一个processQueueTable属性,该属性维护了当前客户端真在处理的所有Queue,以及Queue对应的消费进度,updateProcessQueueTableInRebalance则会更新该属性。

1 找出rebalance后不属于当前客户端实例的Queue或者已经过期的Queue,标记为drop,并由子类判断是否是否需要移除,如果需要移除,则该客户端实例所属的Queue便有改变。

2 判断Rebalance后分配的Queue是否有新增的Queue,如果有则移除缓存中该Queue的消费偏移量(保底操作),计算该Queue的消费偏移量,然后构造一个PullRequest对象存到列表里,标记Rebalane后处理的Queue有改变

3 分发rebalance后新增的Queue列表,即PullRequest列表,由具体的子类处理。pull模式不处理,push模式,则会放到PullMessageService中,循环处理,具体在RebalancePushImpl的dispatchPullRequest方法中

@Override
    public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        for (PullRequest pullRequest : pullRequestList) {
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
        }
    }

这一步中,会调回DefaultMQPushConsumerImpl的executePullRequestImmediately方法,往里面加PullRequest对象,以触发整个Push模式的执行,具体过程会在下节介绍<1>。

2.4. messageQueueChanged

 Rebalance后如果处理的Queue列表发生了变更,则执行相应的动作。对于Push模式,会更新客户端订阅topic的版本号(以当前时间时间戳)并通知broker;对于Pull模式,会回调DefaultMQPullConsumerImpl的MessageQueueListener有Queue发生改变。在MQPullConsumerScheduleService中会用到,用于Pull模式定时消费消息<2>。

2.5. truncateMessageQueueNotMyTopic

 移除Queue中topic不是该实例订阅的对象。

 上面介绍的<1>,<2>点可以用于触发Consumer自动/定时拉取消息,具体会在下节客户端消费过程时介绍。

3. Broker端通知Rebalance

 上面提到的Rebalance是客户端自己定时(默认20秒)执行的,还存在一种Broker端主动通知的情况。

 Broker有一个ConsumerManager,当客户端实例发生变更时(上下线)会通知到各个客户端,客户端收到通知后会调用MQClientInstance的rebalanceImmediately直接执行rebalance。该方法会唤醒ServiceThread,使的RebalanceService不再等待,直接执行。

 RocketMQ与Kafka Rebalance机制类似,二者Rebalance分配都是在客户端进行,不同的是:

  1. Kafka:会在消费者组的多个消费者实例中,选出一个作为Group Leader,由这个Group Leader来进行分区分配,分配结果通过Cordinator(特殊角色的broker)同步给其他消费者。相当于Kafka的分区分配只有一个大脑,就是Group Leader。
  2. RocketMQ:每个消费者,自己负责给自己分配队列,相当于每个消费者都是一个大脑。

4. Rebalance的潜在危害

  1. 消费暂停:考虑在只有Consumer 1的情况下,其负责消费所有5个队列;在新增Consumer 2,触发Rebalance时,需要分配2个队列给其消费。那么Consumer 1就需要停止这2个队列的消费,等到这两个队列分配给Consumer 2后,这两个队列才能继续被消费。
  • 重复消费:Consumer 2 在消费分配给自己的2个队列时,必须接着从Consumer 1之前已经消费到的offset继续开始消费。然而默认情况下,offset是异步提交的,如consumer 1当前消费到offset为10,但是异步提交给broker的offset为8;那么如果consumer 2从8的offset开始消费,那么就会有2条消息重复。也就是说,Consumer 2 并不会等待Consumer1提交完offset后,再进行Rebalance,因此提交间隔越长,可能造成的重复消费就越多。
  • 消费突刺:由于rebalance可能导致重复消费,如果需要重复消费的消息过多;或者因为rebalance暂停时间过长,导致积压了部分消息。那么都有可能导致在rebalance结束之后瞬间可能需要消费很多消息。

附上该部分当时源码阅读过程做的笔记简图:

file

更多原创内容请搜索微信公众号:啊驼(doubaotaizi)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351