绍圣--kafka之服务端处理LeaderAndIsr请求

代理节点处理控制器发送的LeaderAndIsr请求,服务端会交给副本管理器来处理。

创建分区

服务端处理LeaderAndIsr请求,根据主题名称和分区编号创建分区对象(Partition),并更新副本管理器的allPartitions集合。LeaderAndIsr请求记录了分区和分区状态的映射关系。

分区是TopicPartition对象,包括:主题名称,分区编号。

分区状态是PartitionState对象,包括:分区的主副本,ISR,AR。

服务端创建的分区是一个Partition对象,包括:主题名称,分区编号,分区状态信息,副本信息(Replica)。Partition对象综合了TopicPartition对象和PartitionState对象的数据,根据分区状态的副本信息(ISR,AR),生成Replica对象。分区状态的ISR和AR在Partition对象中对应了副本对象集合:inSyncReplicas和assignedReplicaMap。assignedReplicaMap记录了副本编号到副本对象的映射关系。

Partition对象在服务端真正用于分区数据的读写,涉及副本和日志文件的读写用Replica对象。

副本管理器管理了代理节点上的所有分区。针对同一个分区,不管是主副本还是备份副本,存储在代理节点数据目录中的文件夹名称都一样:主题名称+分区编号。

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》:副本管理器管理了代理节点上的所有分区,分区以目录形式存储在代理节点上

创建主副本和备份副本

副本管理器处理LeaderAndIsr请求,具体步骤:

1,创建分区对象,如果分区已经存在,则使用LeaderAndIsr请求中的最新分区状态。

2,对成为主副本的分区调用makeLeaders()方法,为这些分区创建主副本。

3,对成为备份副本的分区调用makeFollowers()方法,为这些分区创建备份副本。

4,如果代理节点是第一次收到LeaderAndIsr请求,则启动最高水位的检查点线程。

5,移除空闲的拉取线程,并调用onLeadershipChange()回调方法。

分区在当前节点上,要么成为主副本,要么成为备份副本。请求中的分区对象TopicPartition在服务端要转化成Partition对象。每个代理节点接收的LeaderAndIsr请求包含多个分区,副本管理器会将请求的所有分区按照分区状态的主副本(leaderId)是否等于当前代理节点的编号(brokerId)分成两种:成为主副本的分区,成为备份副本的分区。

分区分组

假设集群有三个代理节点、主题有三个分区、每个分区有三个副本,每个代理节点都会管理三个分区。分区P1的状态为{leader:1,ar:[1,2,3]},分区P2的状态为{leader:1,ar:[1,2,3]},分区P2的状态为{leader:3,ar:[3,1,2]},从代理节点的角度来看,第一个代理节点将[P1,P2]作为“成为主副本的分区”,将[P3]作为“成为备份副本的分区”,其他代理节点类似。从分区的角度来看,分区P3在第三个代理节点上是主副本,在其他两个代理节点上就是备份副本。

副本管理器将LeaderAndIsr请求的分区按照主副本和备份副本划分后分别存储到对应的集合中,partitionsToBeLeader和partitionsToBeFollower两个集合是互斥的,不会存在一个分区在同一个节点同时作为主副本和备份副本的情况。并且,如果一个分区在某个节点上是主副本,在其他节点上只能是备份副本。

一个分区可以有多个备份副本,但只允许有一个主副本。对于同一个分区而言,如果LeaderAndIsr请求的主副本编号(leaderId)和当前代理节点的编号(brokerId)相等,则调用分区的makeLeader()方法,否则调用分区的makeFollower()方法(一个分区在一个代理节点上只允许存在一个副本,所以同一个分区在同一个代理节点上,不可能被同时加入partitionsToBeLeader和partitionsToBeFollower两个集合)

加入分区集合

副本管理器针对partitionsToBeLeader集合调用makeLeaders()方法,返回partitionsBecomeLeader集合。针对partitionsToBeFollower集合调用makeFollowers()方法,返回partitionsBecomeFollower集合。副本管理器在调用分区的makeLeaders()方法和makeFollowers()方法,这两个方法只有返回true的时候相对应的分区才需要加入对应的分区集合。什么情况下返回true?

1,分区对象的主副本不存在;2,分区对象的主副本已经存在,但和分区状态对象的主副本不同。满足其中一个条件都会返回true。

第一次创建分区的主副本和备份副本时,分区对象的主副本编号还没有定义。因此分区的makeLeaders()和makeFollowers()方法都返回true,都加入对应的分区集合:partitionsBecomeLeader和partitionsBecomeFollower集合。

控制器第二次发送LeaderAndIsr请求,分区的主副本没有变化,makeLeaders()和makeFollowers()方法都返回false,不需要将分区加入对应的分区集合。如果控制器多次下发LeaderAndIsr请求的内容都一样,代理节点实际上只处理一次,代理节点处理LeaderAndIsr请求,不只是通过分区状态的对象(PartitionState)更新分区对象(Partition)的信息,还需要处理分区对象的其他组件:拉取管理器管理的分区集合。

分区与拉取管理器

代理节点的副本管理器会管理所有的分区,拉取管理器会管理所有的备份副本对应的分区。调用分区的makeLeaders()和makeFollowers()方法时,拉取管理器都需要处理分区的变化。

处理主副本,备份副本的逻辑

调用makeLeaders()方法,如果分区之前是备份副本,拉取管理器有这个分区。当分区的备份副本转为主副本,拉取管理器需要将分区移除;反之拉取管理器需要添加分区。

调用makeFollowers()转为备份副本时,需要将日志文件截断到副本的最高水位。调用调用makeLeaders()转为主副本时,需要将副本的最高水位作为日志的最新偏移量。

创建副本

副本管理器创建分区对象(Partition)时,分区对象中的主副本,AR,ISR等信息都为空,当分区创建副本时(不管是主副本还是备份副本),才会开始更新分区对象的相关信息。创建或删除副本只是更新分区的assignedReplicaMap集合(分配给分区的副本集合:AR)。

makeLeaders()和makeFollowers()处理的是多个分区,针对每个分区,分别调用makeLeader()和makeFollower()方法更新分区信息。

makeLeader()方法返回值为true表示新的主副本:分区已有主副本编号与新的主副本编号是否相同。调用makeLeader()的场景:

1,原先没有主副本,调用makeLeader()方法的副本就会转为主副本 。

2,原先有主副本,而且新的主副本和原来的主副本一致。

3,原先有主副本,但是新的主副本和原来的主副本不一致,比如备份副本转为主副本 。

makeLeader()和makeFollower()方法都会设置分区的主副本和AR,创建分区的主副本时,分区对象中有ISR,创建备份副本时,分区对象没有ISR。并且并不是只是创建一个对应的副本,而是根据分区的AR集合创建多个副本。创建的多个副本按照是否在当前代理节点上分为:本地副本和远程副本。

本地副本和远程副本

创建的本地副本有日志文件,创建的远程副本没有日志文件。分区对象的makeLeader()和makeFollower()方法,首先都会从分区状态信息对象中解析出分配给分区的副本集,然后调用方法创建所有副本。

每个分区对象都会创建分区的所有副本。 分区对象从分区状态信息对象中读取所有的副本集(AR),并为每个副本编号创建一个对应的副本对象,分区创建出来的副本不一定都有日志文件。 日志是真正存储在代理节点上的物理介质,只有本地副本才有日志。

本地副本只有一个,远程副本可以有多个。 但本地副本和主副本、备份副本没有必然的联系,本地副本只有结合本地代理节点才有意义 。分区在当前代理节点上是主副本,那么本地副本就是主副本,其他远程副本就是备份副本。分区在当前代理节点上是备份副本,那么本地副本是备份副本,远程副本有一个是主副本,其他都是备份副本。

分区的每个副本(主副本和备份副本)在自己的代理节点上,都有一个对应的本地日志文件。对于同一个分区,备份副本会同步主副本的日志文件数据,并写入到备份副本自己的本地日志文件中。分区的多个副本数据保持了同步。一旦主副本挂掉,控制器会在备份副本中选举一个作为主副本。 因为备份副本的日志文件和旧的主副本已经保持数据同步,所以选举新的主副本,并不会丢失数据。

消费者元数据迁移

协调者处理消费者请求,有两种数据是以内部主题(_consumer_offsets)的形式存储在服务端的协调者节点的内存缓存中:1,消费者提交的偏移量(key:GroupTopicPartition,value:消费者提交的偏移量)。2,消费组分配的状态数据(key:消费组编号,value:分配给每个消费者的分区结果)。当节点宕机时,服务端要处理主副本的故障转移,还要在其他节点上恢复缓存数据。处理内部主题的LeaderAndIsr请求,服务端也会创建分区的主副本和备份副本。 

回顾:消费者要和协调者通信,必须首先找到消费组所属的协调者。消费者通过GROUP_COORDINATOR请求,向任意一个节点获取消费组对应的协调者,然后再和协调者联系。每个消费组都有唯一对应的协调者,协调者会保存消费组相关的元数据。消费者提交分区的偏移量给协调者,更新消费者元数据,除了追加消息到内部主题对应的本地日志文件,也会更新缓存的内容。缓存是为了更快地查询,当需要查询分区的提交偏移量,或者查询消费者的元数据,可以直接查询缓存,而不需要读取日志文件。

消费者联系的协调者节点属于内部主题某个分区的主副本。这个主副本的代理节点上就保持的有对应的消费组元数据。所以只要这个分区的主副本发生改变时(变化到其他代理节点上),消费者联系的节点也会随之变化。如果是成为主副本,消费组的元数据管理器(GroupMetadataManager)会加载分区的消费组元数据到缓存中,如果是转为备份副本,消费组的元数据管理器会移除消费组的缓存内容。

消费组的协调者收到主消费者发送的分区分配结果(同步组请求),并存储到内部主题。然后协调者会发送“同步组响应”给消费组的所有消费者,并且“尝试完成并调度下一次心跳”。最后,协调者才会将消费组的状态改为“稳定”。协调者在调用onGrouploaded()方法加载消费组的元数据时,如果数据有在内部主题中,说明消费组已经处于“稳定状态”,协调者也需要“尝试完成并调度下一次心跳” 。协调者加载完消费组元数据后,会开始监控每个消费者成员的心跳请求 。协调者调用onGroupUnloaded()方法会卸载消费组,并将消费组的状态转为“失败” 。在消费组的状态是失败的情况下,如果消费组之前的状态是“准备再平衡,则返回“加入组响应”给所有消费者,如果消费组之前的状态是“稳定”或是“等待同步”,则返回“同步组响应”给所有消费者,这两种返回响应的错误码都是:“当前节点不是消费组的协调者”。消费者在收到这样的错误码后,会重新连接正确的协调者节点,并重新发送加入组请求或同步组请求。

举例:主题test有三个分区,有三个代理节点,消费组group1的三个消费者分配到不同的分区,并向分区的主副本拉取数据。消费组对应内部主题的分区是_consumer_offset_1,内部分区的主副本在代理节点1上,内部分区保存的数据有两种:三个消费者提交的分区偏移量 、主消费者发送的分区分配结果 。代理节点1的 “消费组元数据管理器”会保存这两种数据的缓存内容(数据写到内部分区后,更新消费组元数据管理器的缓存)。其他两个代理节点因为不是消费组的协调者,所以他们的消费组元数据管理器并不会保存以上两种缓存数据,但是他们的内部分区作为备份副本,会向内部分区的主副本同步数据。如果内部分区_consumer_offset_1的主副本转移到代理节点3上,那么代理节点3的协调者会处理内部分区的数据迁移:加载内部分区的所有数据到缓存中。原先的代理节点1不是消费组的协调者,如果消费者连接到代理节点1上,服务端会返回错误码给消费者。消费者必须重新连接新的协调者,即代理节点3。

当分区的主副本发生故障转移,代理节点处理控制器发送的LeaderAndIsr请求,会对分区调用makeLeader()和makeFollower()方法更新分区的状态信息:更新主副本,AR,ISR。当消费组的协调者发生故障转移,消费组的每个消费者都需要连接新的协调者。代理节点要恢复缓存数据,当消费者需要获取分配的分区,或者根据分区读取提交偏移量,就可以直接从缓存内容中读取,减少磁盘的读取操作。

总结:

1,创建普通主题test和内部主题_consumer_offsets,控制器发送LeaderAndIsr请求,不同代理节点上的不同分区分别成为主副本和备份副本,备份副本会同步主副本的数据。

2,控制器发送UpdateMetadata请求给每个代理节点,元数据缓存保存了每个主题的元数据。

3,消费组的每个消费者向任意一个代理节点发送GROUP_COORDINATOR请求,获取消费组的协调者。

4,消费组的每个消费者从步骤2的元数据缓存获取分配分区的主副本,它向分区的主副本拉取数据。

5,消费组的每个消费者提交分区的偏移量,发送心跳给步骤3的协调者节点。

6,代理节点1出现故障,控制器选举分区test_1和_consumer_offsets_1的主副本,并发送LeaderAndIsr请求给存活的代理节点。代理节点处理请求,更新分区的状态信息。

7,控制器发送UpdateMetadata请求给每个代理节点,更新元数据缓存。

8,消费组的每个消费者原先连接的协调者是代理节点1,它会返回“不是消费组的协调者”给每个消费者。消费者重新发送GROUP_COORDINATOR请求获取新的协调者。因为步骤7中已经更新了元数据缓存,所以消费者查询到的协调者就是管理消费组的最新协调者。

9,消费者1分配分区test_1的主副本转移到代理节点2上,消费者1会从代理节点2拉取数据。

10,消费组的每个消费者连接新的协调者,并提交分区的偏移盘、发送心跳给新的协调者节点。

消费组元数据缓存:存储:内部主题和协调者节点的内存;来源:消费者提交分区的偏移量,消费者保存分区的分配结果;存储内容:偏移量缓存(OffsetAndMetadata)和消费组元数据(GroupMetadata);每个节点数据都不同。

主题元数据:存储:ZK;来源:控制器更新分区的状态信息;存储内容:主题元数据(TopicMetadata);每个节点数据都相同。

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,208评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,502评论 3 405
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,496评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,176评论 1 302
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,185评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,630评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,992评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,973评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,510评论 1 325
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,546评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,659评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,250评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,990评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,421评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,569评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,238评论 3 382
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,732评论 2 366

推荐阅读更多精彩内容

  • 消费者发送加入组请求和同步组请求给服务端,服务端将请求的处理交给消费组的协调者(GroupCoordinator)...
    绍圣阅读 445评论 0 0
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,969评论 2 89
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,356评论 0 9
  • 消费者客户端向协调者发送了加入组请求和同步组请求,那么在协调者在接收到两种请求之后,协调者是怎么处理的喃? kaf...
    绍圣阅读 231评论 0 0
  • 服务端在处理客户端的请求,针对不同的请求,可能不会立即返回响应结果给客户端。在处理这类请求时,服务端会为这类请求创...
    绍圣阅读 9,127评论 0 0