由图中可以看出,broker1和broker2都有topic A的partition0和partition1分区,其中broker1-topicA-partition1是leader,broker2-topicA-partition1是follower。这个在zk的存储结构也有体现。
kafka采取这种模式,可以实现负载均衡和可用性。
1、分片数量和副本数量在创建topic的时候设置,如:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
这条命令会创建一个名为test的topic,有3个分区,每个分区需分配3个副本。
具体消息发送到哪个partition,由producer进行消息路由,采用轮询round-robin或者hash算法。
2、请求首先被分配到kafka的Controller。Controller是从broker集群中选取的一个,负责消息Partition管理和副本状态管理。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。
3、Controller会根据请求中的哪个topic的哪个partition,由PartitionLeaderSelector确认leader,然后由handler将请求路由到leader所在的broker处理。
下面具体解析leader-follower同步和Leader election,以及broker宕机后如何恢复(broker failover)。
1、分区的leader-follower同步
每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,而follower从leader复制写入的数据。
这样就存在一个问题,如果等他所有的follower都从leader上同步完数据才认为消息已提交的话,这样会影响消息发送的性能。如果follower不能及时同步leader上消息的话,则丧失副本的作用,影响高可用性。
因此,kafka维护了一个ISR(in-sync replicas ),如果一个follower宕机(broker宕机当然算),或者落后太多,leader将把它从”in sync” list中移除。ISR机制使得kafka在可用性和性能之间取得一个平衡。
“落后”阀值可以在$KAFKA_HOME/config/server.properties中配置:
#Follower复制的消息落后于Leader后的条数超过预定值
replica.lag.max.messages=4000
#Follower超过一定时间
replica.lag.time.max.ms=10000
另外,producer等待broker的ack有三个策略,通过request.required.acks设置。0--表示不等待确认(最低的延迟和最弱的持久化保证);1--leader接收数据后返回确认信息;-1---所有的in-sync replicas 接收。
2、Leader election
当一个leader宕机,kafka Controller从分区的ISR中选一个作为leader。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
(用这种方式选举leader,对于f+1个replica,一个Kafka topic能在保证不丢失已经commit的消息的前提下容忍f个replica的失败,而对于majority vote,需要2f+1个replica。)
问题一:如果partition的所有replica都非ative时候怎么处理?
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
等待ISR中的任一个replica“活”过来,并且选它作为leader
选择第一个“活”过来的replica(不一定是ISR中的)作为leader(默认采取)
(这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源)
Kafka Controller提供五中选举器:
(1)、ReassignedPartitionLeaderSelector
从可用的ISR中选取第一个作为leader,把当前的ISR作为新的ISR,将重分配的副本集合作为接收LeaderAndIsr请求的副本集合。
(2)、PreferredReplicaPartitionLeaderSelector
如果从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,否则将第一个副本设置为分区leader。
(3)、ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,然后选取第一个副本作为leader,然后令当前AR作为接收LeaderAndIsr请求的副本。
(4)、NoOpLeaderSelector
原则上不做任何事情,返回当前的leader和isr。
(5)、OfflinePartitionLeaderSelector
从活着的ISR中选择一个broker作为leader,如果ISR中没有活着的副本,则从assignedReplicas中选择一个副本作为leader,leader选举成功后注册到Zookeeper中,并更新所有的缓存。
3、broker failover过程简介
(1)、Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
(2)、Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
(3)、对set_p中的每一个Partition
3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch(选举版本号)写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
(4)、直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。