1.复制
kafka使用多副本来确保数据的持久性,是典型的主备复制模型。
每个topic分区包含一个leader副本和若干个follower副本,leader副本对外提供读写服务,follower副本只同步数据,不对外提供服务。
1.1 日志+Pull复制模型
leader维护本地最新日志偏移leo,和已提交日志偏移hw(所有副本均写入成功,则认为可提交)。
follwer基于pull方式从leader同步数据,数据写入本地日志文件(内存pagecache)后,向leader确认offset。
RPC:ReplicaFetchReqeust,设置offset字段指示待消费的日志偏移,也是对offset之前消息的确认。
这种简单的日志+Pull复制模型,在编程实现上非常简单,并且,
1)当某个follower发生故障,或者网络中断后,可以丢掉不完整的日志,从指定offset重新开始同步。
2)leader和follower副本上的消息顺序是完全一致的。
1.2 Isr和慢副本检测
在kafka中,每个分区的多个副本组成了一个Isr(同步副本)队列。当某个follower的复制速度落后于leader太多的时候,leader从Isr队列中删除这个follower。这个follower可以继续复制,直到追赶上leader的日志进度后,再把它重新加入Isr队列。
leader通过比较hw和follower的offset,当两者的差距超过replica.lag.time.max.ms的时候,就判定为慢副本,从Isr队列中删除该副本。
如果当前分区的leader挂了,队列里的每个副本都有机会成为新的leader(由controller来选择新的leader,下面会讲到)。
1.3 leader切换
当发生leader切换时,controller从Isr中选择一个副本作为新的leader,
1)新的leader:设置hw=leo。
2)follower:从hw截断日志文件,并开始从新的leader同步。
从以上过程可以看出,已commit消息保证不会丢失,未commit的消息可能会丢失。
例如,Isr中有副本a和b, hw是6, a的leo是10,b的leo是9,假如b被选择为新的leader,则新的hw=9,则消息10就会丢失。
1.4 持久性
kafka可以提供的保证是commit消息不会丢失,未commit消息对consumer不可见。
持久性保证的前提是topic分区Isr队列至少有一个副本是可用的,
1)如果Isr副本全部宕掉,这时有一个不在Isr中的副本,如果它被选为leader,可能会导致提交的消息丢失。应用如果追求高可用性,必须容忍部分commit数据丢失的风险;如果追求持久性,则可以设置unclean.leader.election.enable=false,禁止未同步副本当选leader。
2)设置min.insync.replicas,来避免因同步副本数较少,而带来的数据丢失风险。
producer可以通过指定request.required.acks=N(总副本数),来和leader约定持久性级别。
0:producer不等待来自borker的ack,继续发送下一条(批)消息。消息是否写入成功没有保证,甚至broker可能都没收到消息。
1:producer等待leader写入本地日志完成的ack,再继续发送下一条(批)消息。假如在消息commit之前leader宕机,则消息可能丢失。
-1:producer等待当前Isr中的所有副本写入成功的ack,再继续发送下一条(批)消息。除非Isr终端所有副本都出现数据丢失的故障,否则数据的持久性是有保证。
这里有一个隐藏问题:如果在写消息的过程中,leader宕机,或者发给producer的ack消息丢失,producer没有办法判断消息是否已经写入成功。producer只能选择重试,这就导致消息可能会被重复写入,消息乱序。针对这个问题,kafka引入了幂等消息发送机制,确保在producer的一次会话期间,多次重试不会导致消息重复和乱序。
2.集群管理
kafka使用一个全局的controller管理集群分区和副本的状态,选择分区的leader副本,创建删除主题,执行分区迁移等管理操作。
集群中的每个broker都可以充当controller,通过zk来选举controller。当contoller所在的borker故障时,会有新的broker充当controller的角色。
controller在zk保存集群的分区和副本状态,当发生故障恢复时,controller可以从zk加载数据。
controller通过RPC:LeaderAndIsr 、UpdateMetadata 控制分区中所有副本所在的broker更新分区和副本状态。直接的RPC交互降低了通信成本,也降低了zk的访问压力(想象一下每个broker都通过监控zk,来更新各个分区和副本的状态(P*N),会是一个什么情形),使得集群可以容纳更多的的分区和副本。
当leader副本所在的broker宕机,或者发生分区自动再平衡,或者执行分区迁移命令时,controller在分区isr中重新选择一个副本作为分区leader(leader选择算法,例如OfflinePartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector)。
【参考】:
[0]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication
[1]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Messagereplication
[2]https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
[3]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
[4]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Redesign