原文地址:https://kafka.apache.org/0101/documentation.html#replication
Kafka为Topic的每个Partition日志进行备份,备份数量可以由用户进行配置。这保证了系统的自动容错,如果有服务器宕机,消息可以从剩余的服务器中读取。
其他消息系统提供了备份相关的功能,但在我们看来,这是一个附加的功能,不能被大量使用,并且伴随着大量的缺点:Slave是不活跃的(这里应该是指Slave只提供了备份,并不可以被消费等等)、吞吐受到很大的影响、需要手动配置等等。在Kafka中,我们默认就提供备份,实际上我们认为没有备份的Topic是一种特殊的备份,只是备份数为1。
备份的单位是Topic的分区。在没有发生异常的情况下,Kafka中每个分区都会有一个Leader和0或多个Follower。备份包含Leader在内(也就是说如果备份数为3,那么有一个Leader Partition和两个Follower Partition)。所有的读写请求都落在Leader Partition上。通常情况下分区要比Broker多,Leader分区分布在Broker上。Follower上的日志和Leader上的日志相同,拥有相同的偏移量和消息顺序(当然,在特定时间内,Leader上日志会有一部分数据还没复制到Follower上)。
Follower作为普通的Consumer消费Leader上的日志,并应用到自己的日志中。Leader允许Follower自然的,成批的从服务端获取日志并应用到自己的日志中。
大部分分布式系统都需要自动处理故障,需要对节点“alive”进行精确的定义。例如在Kafka中,节点存活需要满足两个条件:
- 1.节点需要保持它和ZooKeeper之间的Session(通过ZK的心跳机制)
- 2.如果是Follower,需要复制Leader上的写事件,并且复制进度没有“落后太多”
我们称满足这两个条件的节点为“同步的”来避免使用“alive”或“failed”这样模糊的概念。Leader节点保存同步中的Follower节点。如果一个Follower宕机或复制落后太多,Leader将从同步的Follower List中将其移除。通过replica.lag.time.max.ms配置来定义“落后太多”。
在分布式系统的术语中,我们只尝试处理“失败/恢复”模型——节点突然停止工作之后恢复的情况。Kafka不处理“拜占庭”问题。
一条消息在被应用到所有的备份上之后被认为是“已经提交的”。只有提交了的消息会被Consumer消费。这意味着Consumer不需要担心Leader节点宕机后消息会丢失。另一方面,Producer可以配置是否等待消息被提交,这取决于他们在延迟和可用性上的权衡。这个可以通过Producer的配置项中设置。
Kafka提供了一条消息被提交之后,只要还有一个备份可用,消息就不会丢失的保证。
Kafka保证在节点故障后依旧可用,但是无法再网络分区的情况下保持可用。
Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)
Kafka分区机制的核心是日志复制。日志复制是分布式系统中最基础的东西,有很多方式可以实现。日志复制可以作为基于状态机的分布式系统的基础设置。
日志复制模型用于处理连续、有序的输入(例如给log entry添加0、1、2这样的编号)。有很多方式实现日志复制,最简单的方式是Leader选择和提供这个顺序之。只要Leader节点存活,Follower只需要按照Leader选择的值和顺序来复制即可。
当然,如果Leader不会宕机,那我们也不需要Follower了!在Leader宕机之后,我们需要在Follower中选择一个节点成为新的Leader。Follower可能会宕机或者日志落后较多,所以我们必须确保选择一个“及时同步”(复制进度和Leader最近的节点)成为新的Leader。复制算法必须提供这样的保证:如果Client收到一条消息已经被Commit了,如果Leader宕机,新Leader必须包含这条已经被Commit的消息。这是一个权衡:Leader在确认消息Commit之前需要等待更多的Follower来确认复制了消息来保证在Leader宕机后有更多可以成为Leader的Follower节点。
如果你选择了所需要的ACK的数量以及选择Leader时需要比较的日志数以确保能重合,这个叫做Quorum。
一个通用的来权衡的方式是提交日志和选择Leader时都采用大多数投票的原则。这不是Kafka使用的方式,但是无所谓,让我们去理解这种方式来了解实现原理。假设一共有2f+1个备份,那么f+1的副本必须在Leader提交commit之前接收到消息,这样就可以从f+1个节点中选择出新节点作为Leader。因为任何f+1个节点,必然有一个节点包含最全的日志。还有很多关于这个算法的细节需要处理(如何定义日志更全、在Leader节点宕机时保持日志一致性等)在这里先忽略。
大多数选票的方法有非常好的特性:延迟取决于同步最快的Server节点。这说明,如果备份数为3,那么延迟取决于两个备份节点中较快的节点。
有很多类似的算法变体,例如ZooKeeper的Zab,Raft,Viewstamped Replication等。和Kafka最相似的学术刊物是微软的PacificA。
大多数选票方式的取消是它不能容忍很多的故障,导致你没有可以被选为新Leader的节点。为了容忍一个节点故障,需要3分数据备份,容忍两个节点故障则需要5个节点。在我们的经验中,只有足够的冗余才能容忍单一的故障在实际系统中是不够的,每次写5次副本,使用5倍的存储空间,和1/5的带宽,在大体量的数据存储上不是很可行。这就是为什么quorum算法多多应用在像ZK这样存储配置的集群中,而不是数据存储系统中。例如HDFS的namenode的高可用建立在大多数选票的机制上,但是数据存储缺不是。
Kafka使用一个明显不同的方式来选择quorum集合。代替大多数选票,Kafka动态的维护一个“同步的备份(in-sync replicas ISR)”的集合。只有这个集合中的成员能被选举为Leader。一个写入请求需要同步到所有的同步中的备份才能认为是提交的。ISR集合在变更时会被持久化到ZK。因此,任何ISR中的备份都可以被选举为新的Leader。这对于Kafka这种拥有多分区并且需要保证这节点负载均衡的模型来说非常重要。使用ISR模型和f+1个副本,Kafka可以容忍f个备份不可用的情况。
对于大多数的场景,我们认为这样的妥协是合理的。在实践中,为了容忍f个节点故障,大多数选票原则和ISR方式都需要等待相同的备份在提交消息前进行确认(如需要容忍一个节点故障,大多数选票的选择需要3个节点,并且提交消息需要至少一个备份的确认;ISR只需要两个节点,需要确认的副本数一样是一个)。相对于大多数选票的原则,ISR方式不需要等待最慢的服务器确认消息是一个优势。尽管如此,我们进行改善,让客户端决定是否等待消息提交,使用较小的副本数,这样带来的吞吐和更小的磁盘空间要求是有价值的。
另一个重要的设计是Kafka不需要故障的节点恢复所有的数据。这是不常见的,复制算法依赖于存储介质在任何故障的情况下都不丢失数据并且不违反一致性原则。这个假设有两个主要的问题。第一,磁盘故障是持久化数据系统中最常见的问题,并且它通常导致数据不完整。第二,即使这不是一个问题,我们也不希望在每一次写入之后都使用fsync来保证一致性,这会使性能下降两三个数量级。我们的协议中允许一个副本重新加入到ISR集合中,在重新加入之前,它需要从新同步在故障时丢失的数据。
Unclean leader election: What if they all die?
Kafka保证的数据不丢失,在至少有一个备份保持同步的情况下。如果一个分区所有的备份的节点都故障,那么就不能提供这个保障了。
但是实践系统中需要一些合理的事情,在所有备份故障时。如果不巧遇上这个问题,去考虑哪些情况会发生是非常重要的。有两种方式去做:
- 1.等待一个ISR中的副本恢复并将其选举为新的Leader(期望它拥有所有的数据)。
- 2.选择第一个副本(无需在ISR中)作为Leader。
这是在可用性和一致性之间的权衡。如果我们等待ISR中的备份恢复,那么会在这个期间一直不可用。如果这样的副本被损坏,那么我们将永久性的失效。另一方便,如果使用不在ISR中的备份成为Leader,尽管它可能不包含所有的日志。默认情况下,Kafka使用第二种策略,当所有ISR中的备份不可用时,倾向于选择可能不一致的备份。这个方式可以通过unclean.leader.election.enable配置禁用,在哪些停机时间优于不一致的场景。
这种困境不是kafka特有的,这存在于任何基于quorum方式的结构中。例如,多数投票算法,如果大多数的服务器都永久性失效了,你必须选择丢失全部的数据或者接受某一台可能数据不一致的服务器上的数据。
Availability and Durability Guarantees
在向Kafka写入数据时,Producer可以选择是否等待0,1或(-1)个备份响应。注意,这里说的“被所有备份响应”不是说被所有分配的备份响应,默认情况下只的时所有ISR集合中的备份响应。例如,如果一个Topic配置成只需要两个备份,并且一个备份故障了,那么写入一个备份即认为收到了所有响应。但是,如果这个备份也故障了,那么数据会丢失。这样保证了分区的最大可用,但是可能不是那些相对于可用性更需要可靠性的用户的需求。因此,我们提供两种Topic级别的配置,相对于可用性,优先保证可靠性:
- 1.禁用unclean leader election;如果所有备份不可用,那么分区保持不可用,直到最近的Leader重新恢复可用。这可能导致不可用,但是不会丢失数据。
- 2.配置一个最小的ISR大小;分区只会在满足最小ISR的情况下接受请求,这样可以避免数据只写入一个备份,而这个备份后续故障导致数据丢失。这个配置只在Producer使用acks=all的配置时有效。这个配置在一致性和可用性上做了权衡。更大的ISR提供了更好的一致性,但是降低了可用性,如果同步备份数小于最小ISR配置时将不可用。
Replica Management
以上的讨论都是基于一个日志,即一个Topic的分区考虑的。但是Kafka集群拥有成百上千这样的分区。我们尝试使用轮训的方式来平衡分区,避免高数量的Topic的分区集中在一部分少量的节点上。同样我们要平衡所有Leader分区,这样每个节点上承载的主分区都有一定的比例。
优化Leader的选举过程也是非常重要的,因为这是系统不可用的窗口期。一个直观的实现是,如果一个节点故障了,为这个节点上所有的分区都独立的执行一次选举。代替这种方式,我们选择一个Broker作为Controller,Controller负责一个故障节点影响的所有分区的Leader变更。这样的好处是我们可以批量处理,减少独立选举时大量的通知,这使得大量分区需要选举时变得更快,代价更小。如果Controller故障了,剩余的Broker中会有一个节点成为新的Controller。