在一个Kafka集群中,某个broker会被选举出来承担特殊的角色,即控制器(下称controller)。就是管理集群中所有分区的状态并执行相应的管理操作。
每个Kafka集群任意时刻都只能有一个controller。当集群启动时,所有broker都会参与controller的竞选,但最终只能由一个broker胜出。一旦controller在某个时刻崩溃,集群中剩余的broker会立刻得到通知,然后开启新一轮的controller选举。
管理状态
controller维护的状态分为两类:
- 每台broker上的分区副本。
- 每个分区的leader副本信息。
从维度上看,这些状态又可分为副本状态和分区状态。controller为了维护这两个状态专门引入了两个状态机,分别管理副本状态和分区状态。
(1)副本状态机(Replica State Machine)
Kafka为副本定义了7种状态:
- NewReplica:controller创建副本时的最初状态。当处在这个状态时,副本只能成为fol-lower副本。
- OnlineReplica:启动副本后变更为该状态。在该状态下,副本既可以成为follower副本也可以成为leader副本。
- OfflineReplica:一旦副本所在broker崩溃,该副本将变更为该状态。
- ReplicaDeletionStarted:若开启了topic删除操作,topic下所有分区的所有副本都会被删除。此时副本进入该状态。
- ReplicaDeletionSuccessful:若副本成功响应了删除副本请求,则进入该状态。
- ReplicaDeletionIneligible:若副本删除失败,则进入该状态。
当创建某个topic后,该topic下所有分区的所有副本都是NonExistent状态的,此时controller加载ZooKeeper中该topic每个分区的所有副本信息到内存中,同时将副本状态变更为New,之后controller选择该分区副本列表中的第一个副本作为分区的leader副本并设置所有副本进入ISR,然后在ZooKeeper中持久化该决定。
一旦确定了分区的leader和ISR之后,controller会将这些信息以请求的方式发送给所有副本,同时将这些副本状态同步到集群的所有broker上以便让它们知晓。当这些都做完后,controller会将分区的所有副本状态置为Online,这也是副本正常工作的状态。
(2)分区状态机(Partition State Machine)
分区状态共4个:
- NonExistent:表明不存在的分区或已删除的分区。
- NewPartition:一旦被创建,分区便处于该状态。此时,Kafka已经为分区确定了副本列表,但尚未选举出leader和ISR。
- OnlinePartition:一旦该分区的leader被选出,则进入此状态。这也是分区正常工作时的状态。
- OfflinePartition:在成功选举出leader后,若leader所在的broker宕机,则分区将进入该状态,表明无法正常工作了。
controller职责
- 更新集群元数据信息
client 能够向任意一台 broker 查询topic的分区信息(比如topic有多少个分区、每个分区的leader在哪台broker上以及分区的副本列表)。随着集群的运行,这部分信息可能会发生变化,当有分区信息发生变更时,controller将变更后的信息发送给集群中的每个broker。
- 创建topic
controller启动时会创建一个ZooKeeper的监听器,监控 /brokers/topics 下子节点的变更情况,一旦有新增znode,controller会为新建topic的每个分区确定leader和ISR,然后更新集群的元数据信息。之后,还会创建一个新的监听器监听 /brokers/topics/<新增topic> 节点内容的变更,当topic分区发生变化的时候controller也能马上得到通知。
- 删除topic
kafka删除topic是向zk的 /admin/delete_topics 下新建一个znode,controller启动时会创建一个监听器专门监听该路径下的子节点变更情况,一旦发现有新增节点,controller立即停止所有副本运行、删除所有副本的日志数据,然后移除 /admin/delete_topics/<待删除topic> 节点。
- 分区重分配
- preferred leader副本选举
- topic分区扩展
分区扩展与创建topic一样会向 /brokers/topics/<topic> 节点下写入新的分区目录,controller 监听器会得到通知,执行分区创建任务。
- broker加入集群
每当有新broker加入集群时,该监听器会感知到变化,执行对应的broker启动任务,之后更新集群元数据信息并广而告之。
- broker崩溃
broker崩溃后,监控器会得到通知,从而开启broker退出逻辑,更新集群元数据并同步到其他broker上。
- 受控关闭
通过 kafka-server-stop 脚本关闭Kafka broker是受控关闭,而掉电、kill-9 的方式是不受控关闭。受控关闭能够最大限度地降低broker的不一致性。受控关闭是由即将关闭的broker向controller发送请求,待关闭broker将一直处于阻塞状态,controller在处理完必要的leader重选举和ISR收缩调整之后,会给broker发送ControlledShutdownResponse表明该broker现在可以正常退出。
- controller leader选举
集群首次启动时所有broker都会抢着创建 /controller 节点,ZooKeeper保证了最终只能有一个broker胜出,胜出的那个broker即成为controller,其他broker继续监听/controller节点的存活情况并随时准备竞选。