02. KafkaConsumer订阅topic数据,是如何实现加入组以及再平衡原理

02. KafkaConsumer订阅topic数据,是如何实现加入组以及再平衡原理

上一篇在分析Consumer不通订阅方式的时候,提到了同一消费组下有多个消费者时,会触发再平衡,本篇将通过源码详细的分析是如何加入组以及再平衡的实现原理。

本篇源码分析基于kafka版本:kafka-0.10.1

首先介绍一下kafka的服务端有一个GroupCoordinator组件,该组件主要负责消费者组相关信息的维护,主要有以下功能:

    确定当前消费组改由哪个broker提供coordinator服务

    消费组成员加入

    消费组成员离开

    消费组成员心跳

    消费组成员已消费offset保存

    消费组整体状态维护和再平衡

对GroupCoordinator组件有了大概了解之后,下面开始我们的正文


前提有如下:

    topic名称:test.kafka

    topic分区:2个(test.kafka-0、test.kafka-1

    消费者group.id:test.kafka_group

02.01 第一个消费者加入消费者组过程

我们创建一个KafkaConsumer消费topic数据的代码大体是如下步骤:

KafkaConsumer加入消费者组(test.kafka_group)过程都是在consumer.poll方法实现的,我们接下来参考poll方法的实现

poll方法调用了pollOnce方法

02.01.01第一步:确认coordinator是否就绪,主要作用和是服务器端的GroupCoordinator组件通信

以下是关于ensureCoordinatorReady方法的分析

向服务器发送GroupCoordinator请求,根据group.id获取给该组提供coordinator服务的broker

第一步:获取请求数量最小的node(包括broker的ip、port)

第二步:向第一步获取的broker发送GroupCoordinator请求,参数为group.id

第三步:处理服务器响应结果

第一步:获取服务器返回的为当前gourp.id提供coordinator服务的broker(服务器是通过group.id的hashcode % __consumer_offsets topic的partition 数量,该partition

leader所在服务器为返回的broker),并且初始化coordinator变量

第二步:和broker建立连接

02.01.02第二步:主要是加入消费者组,以及同步分配给当前消费者的topic partitions

以下是关于ensurePartitionAssignment方法的分析

ensurePartitionAssignment方法调用了ensureActiveGroup方法

向服务器发送joinGroup请求

第一步:创建JoinGroupRequest请求,重点介绍以下几个参数:

        String groupId:group.id 参数

        String memberId:消费组的成员ID,此时值为空

        String protocolType:固定值”consumer”

        ListgroupProtocols

通过以上可以分析出参数groupProtocols的值

第二步:通过JoinGroupResponseHandler对服务器返回结果进行处理

joinResponse返回参数介绍

joinResponse.memberId:服务器给当前消费组生成的ID

joinResponse.generationId:服务器生成的一个自增ID

joinResponse.groupProtocol:客户端如果不做任何配置,默认值是”range”

joinResponse. leaderId:如果是第一个加入消费组,leaderId和当前memberId相同

joinResponse.

members:当前消费组中所有的消费成员,只有leader会有这个参数,非leader消费者该值为空

第三步:调用onJoinLeader方法,根据members的数量和订阅的topic:test.kafka的partition数采用rang的分配策略进行分配(调用的方式是performAssignment,因为当前只有一个消费成员,所以该成员分配到两个partition),然后把分配的结果通过SyncGroupRequest请求发送给服务器

第四步:对SyncGroupRequest请求返回结果的处理

该行最后会调用ConsumerCoordinator.onJoinComplete方法如下图

把给该消费者分配的top partition设置给当前消费者,调用该消费这订阅topic时设定的再平衡监听器(此处正好回答了第一篇的问题

到此,消费者组的第一个消费者加入组的过程介绍完成了

02.02 第二个消费者加入消费者组过程

当第二个消费者加入消费组的时候,第一个消费者会跟着联动,最终实现每一个消费者成员消费topic:test.kafka的一个partition,具体实现流程如下分析

在kafka的服务器端通过GroupMeta维护者消费者组的状态,当我们第二个消费者在没有启动之前,GroupMeta的状态处于Stable,members只有一个成员。

当第二个消费者启动时同样第一步发送GroupCoordinatorRequest(分析过程见上图)获取GroupCoordinator所在的broker,第二步发送JoinGroupRequest(分析过程见上图)加入消费组,此时会将服务器GroupMeta的状态由Stable变成PreparingRebalance再变为AwaitingSync,第二个消费者接收到了服务器生成的memberId和Leader memberId信息,由于它本身不是leader进入onJoinFollower的处理如下图

在onJoinFollower方法中,接着发送SyncGroupRequest等待从服务器获取给自己分配的topic partition(等待消费组中的leader完成重新的分配)

那么时如何触发leader重新再分配的呢?接着看下面的分析

每个KafkaConsumer和服务器一直都维护着心跳,正式通过心跳感知到了服务器GroupMeta状态的变化,然后触发leader的重新分配(源码如下)

KafkaConsumer在通过poll方法拉取数据过程会通过延时任务的方式(HeartbeatTask实现了延时接口DelayedTask)触发发送心跳包如下图

Kafka服务器收到心跳包之后由于该GroupMeta的状态是AwaitingSync,会向客户端返回Errors.REBALANCE_IN_PROGRESS的错误码,然后客户端通过重置rejoinNeeded=true,触发leader 重新发送JoinGroupRequest请求完成重新分发,然后把分配结果发送SyncGroupRequest发送给kafka服务器,kafka服务器把GroupMeta的状态由AwaitingSync变成Stable,此时非leader的第二个消费组通过SyncGroupRequest获取给自己分配需要消费的topic partition,这样就完成了再平衡回复到正常消费状态

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容