6.消息投递
我们已经了解了一些生产者和消费者是如何工作的,现在让我们讨论在生产者和消费者之间,kafka提供的语义上的保证,显然kafka提供了多种可能的消息投递保证:
- At most once:最多一次,消息可能丢失,而且也不会被再次投递;
- At least once:最少一次,消息不会丢失,但是有可能会被再次投递;
- Exactly once:恰好一次,这是用户实际需要的,每条消息会且仅会被投递一次。
这会被分解为两个问题:发送消息时的持久化保证和消费消息时的保证。
许多系统申明提供了"恰好一次"投递语义,但是注意阅读细节,许多的这些申明是误导(他们没有考虑消费者或者生产者出错的情况,也没有考虑有多个消费者的情况,也没有考虑数据写到磁盘可能丢失的情况)。
kafka的语义非常直接,当发送消息的时候,我们有一个消息被"提交"到日志中的概念,一旦一个发送的消息被提交,只要消息写入的分区有一个broker存活,消息就不会丢失。
现在让我们假设一个完美的,没有任何损坏的broker,然后试图理解对生产者和消费者的保证。如果一个生产者试图发送一个消息并且碰到网络错误,那么它不能确认错误发生在消息提交前还是后。
在0.11.0.0版本之前,如果生产者没有收到消息已提交的响应,它只能选择重新发送消息。这就提供了"至少一次"投递语义。因为原来请求如果实际成功了,那么重发可能将消息再次写入日志中。
从0.11.0.0开始,kafka生产者也支持幂等投递选项,从而保证重发消息不会在日志中重复写入。为了实现这种特性,broker分配给每个生产者一个ID,并且使用生产者发送每条消息的序列数字,实现重复消息删除。
同时从0.11.0.0开始,生产者支持使用类事务语义向多个topic分区发送消息的能力:要么所有消息成功写入所有分区,要么不写入任何分区。这个特性的主要使用场景就是为了kafka topic的"恰好一次"处理。
并不是我们所有的使用场景都需要如此强的保证,对那些延迟敏感的用户,我们允许生产者指定它期望的持久化级别。生产者可以指定它们要等待消息提交,但是生产者也能指定执行完全异步发送,或者等待只要leader(而不是所有的副本)确认消息即可。
现在让我们从消费者的角度描述这个语义。所有副本具有相同的offset,相同的日志,消费者控制其在日志中的位置。如果消费者从来没有出问题,它能保存这个位置信息在内存中,但是如果消费者故障,我们要topic分区被另一个消费者接管,新的处理需要选择一个开始处理的合适的位置,它有几个处理消息并更新位置的选项:
它能读取消息,然后在日志中保存它的位置,最终处理消息。在这种情况下,消费者进程可能在保存位置之后,但在消息处理之前崩溃。这种情况下,进程接管并从已经保存的位置开始处理,即使这个位置之前少量消息还没有被处理。这就是"最多一次"语义,如果消费故障,消息可能没有被处理。
它能读取消息,处理消息,最终在日志中保存它的位置。在这种情况下,消费者进程在处理消息之后,但是在保存位置之前崩溃。这种场景,当新的进程接管并接受少量已经被处理过的消息。这就对应于"至少一次"的语义,许多情况下,消息有主键因此更新是幂等的(两次收到相同的消息,只是用另一个自身的副本覆盖记录)。
那么(用户实际需要的)"恰好一次"语义呢?当从kafka的topic消费并产生到另一个topic时(即kafka stram应用场景),我们可以利用上面提到的0.11.0.0中的新的事务性的生产者的能力。
消费者的位置以topic的消息形式保存。因此我们能在接收topic处理数据结果的相同事务中把offset写到kafka中。如果事务中止,消费者的位置将回到老的地方,在topic上生成的数据对其他消费者不可见,当然这取决于它们的隔离级别(参考参数:isolation.level
)。默认是read_uncommitted
隔离级别,即所有消息对所有消费者可见,即使它们是中止事务的一部分。但是如果设置read_committed
,消费者仅能消费从已提交事务中返回的消息。
当写到外部系统时,这限制是需要协调消费者的位置与实际存储输出。实现这个目标的经典方法就是在消费者位置存储和消费者输出存储之间引入二阶提交。但是可以通过让消费者将offset存储在与其输出相同的位置来更简单地处理,这是更好的,因为消费者可能想要写入的许多输出系统可能不支持两阶段提交。 例如,一个Kafka Connect连接器,它填充HDFS中的数据以及它读取的数据的偏移量,以确保数据和偏移都被更新或两者都不更新。 我们遵循许多其他数据系统的类似模式,这些数据系统需要这些更强的语义,并且消息没有主键以允许重复数据删除。
因此,kafka在kafka stream中支持消息"恰好一次"投递。事务性的生产者和消费者通常被用来在kafka topic之间传输数据或者处理数据时,提供"恰好一次"投递。
其他目标系统中"恰好一次"投递通常需要与此类系统合作,kafka提供offset,使得实现这点是可行的。因此,kafka默认保证至少一次投递,并允许用户通过关闭生产者的重试机制并在消费者处理一批消息之前提交offset来实现"最多一次"投递。
7.复制
kafka通过服务器配置数值复制每个topic的分区日志。当集群中某台服务器故障后,允许自动故障转移到其他副本,因此碰到这种故障时,消息任然可用。
其他消息系统提供一些相关的复制特性,但是,在我们看来,没有很大的用处,并且有很大的缺点:它们是不活动的,吞吐量严重受到影响,需要复杂的手动配置等。kafka默认具备复制功能,事实上我们把复制因子为1的topic当做没有复制的topic来实现的(这时候每个分区只有1个leader,没有任何follower)。
复制单元是topic的分区。在没有故障的情况下,每个kafka的分区有一个leader,以及0个或者多个follower。包括leader在内的总副本数就是副本因子(例如1个leader,2个follower,那么副本因子为3),所有的读和写在分区的leader上执行。整个kafka集群通常有很多的分区,这些分区的leader均匀分布在集群的所有broker上。follower的日志完全和leader的日志一样:有完全一样的offset,消息顺序也完全一样(当然,某些时刻,leader可能有一些还没有被复制到follower的消息在最新的日志中,但是这些日志对客户端是不可见的)。
follower从leader消费消息,和一个普通的kafka消费者一样。然后将消息保存在它们自己的日志中。follower从leader拉取日志,这是非常好的特性,这让follower可以很自然的批量处理日志。
和许多分布式系统一样,自动处理故障需要一个节点存活的明确的定义。对kafka节点来说,存活有两个条件:
- 节点必须能通过zookeeper的心跳机制与其保持回话;
- 如果是follower,它必须复制发生在leader上的写入,不能落后太多。
我们认为节点满足这两个条件就是"In Sync"(处于同步中),从而避免混淆存活和故障两个概念。In-Sync的副本集合被称为ISR,leader持有ISR信息。如果follower死亡,卡住,或者落后,leaer就会把他从ISR列表中移除。卡住和落后副本由配置参数replica.lag.time.max.ms
决定。
在分布式系统术语中,我们只尝试处理真正有故障的“故障/恢复”模型,例如其中节点突然停止工作,然后恢复(可能不知道它们已经死亡)。 Kafka没有处理所谓的“拜占庭(Byzantine)”故障,即节点产生任意或恶意的响应(可能是由于错误或犯规)。
拜占庭将军问题:拜占庭是东罗马帝国的首都。由于当时拜占庭罗马帝国国土辽阔,为了防御目的,因此每个军队都分隔很远,将军与将军之间只能靠信差传消息。 在战争的时候,拜占庭军队内所有将军和副官必须达成一致的共识,决定是否有赢的机会才去攻打敌人的阵营。但是,在军队内有可能存有叛徒和敌军的间谍,左右将军们的决定又扰乱整体军队的秩序。在进行共识时,结果并不代表大多数人的意见。这时候,在已知有成员谋反的情况下,其余忠诚的将军在不受叛徒的影响下如何达成一致的协议,拜占庭问题就此形成。
我们现在能更加精确的定义消息被认为提交,即当分区所有ISR都同步了该消息到日志中。并且只有已经提交的消息才会被投递到消费者,这就意味着消费者不需要担心可能看到那些如果leader故障而丢失的消息。反过来说,生产者可以决定是否等待消息被提交,这取决于它们在延迟和可靠性之间平衡的偏向性。这个偏向性通过生产者使用acks
参数来控制。需要注意的是,topic还有一个ISR最小数量的设置(参数min.insync.replicas
),用于检查生产者请求确认一个消息已经被写入ISR集合的最小数量,这个参数只有在acks=-1
时才能生效。所以如果生产者请求一个不太严格的确认机制(例如acks=0,或者acks=1),即使ISR数量比参数指定的数量还低,消息仍能被提交,也能被消费。
kafka提供的保护机制的前提是在任何时候只要还有一个存活的ISR,那么已经被提交的消息就不会丢失。
复制日志: 法定人数(Quorums),ISR,状态机(State Machines)
kafka分区的核心是复制日志。在分布式数据系统中,复制日志是最基本的原语之一,也有许多方法可以实现一个。
复制日志模拟了对一系列值的顺序达成共识的过程(通常将日志条目编号为0,1,2,...),有许多方法实现它,但是最简单和最快的方法是leader选择提供给它值的顺序。只要leader存在,所有副本只能拷贝leader的值和顺序。
当然如果leader没有故障,我们不需要复制。但是当leader死亡了,我们需要从follower中选择一个新的leader。但是那些follower本身可能落后,或者故障。因此我们必须确保选择一个最新的follower。一个日志复制算法必须提供这样的基本保护,如果我们告诉客户端消息被提交了,然后leader故障,那么我们选举的新的leader必须有这个消息。这就带来了一个trade-off:如果leader在申明消息已提交前,等待更多follower确认消息,那么就会有更多潜在的leader。
这种权衡的一种通用做法就是用多数投票从而达成提交决定和leader选举。这不是kafka的行为,而是让我们抛出这个问题来理解trade-off。假设我们有2n+1个副本,leader申明消息为提交前必须有n+1个副本收到消息,如果我们选举了一个新的leader,那么它必须是n+1个副本中拥有最完整的日志的follower。然后,只要不超过n个副本出现故障,就一定能保证leader有所有完整的已提交消息。
这是因为在n+1个副本之间,一定有至少一个副本包含了所有已提交消息,这个副本的日志是最完整的,因此将被选举为新的leader。每种选举算法还有许多其他细节需要处理,我们现在先忽略它(因为这不是这里讨论的重点)。
多数投票方法有一个非常好的属性:延迟只取决于更快的服务器。意思是,如果副本因子为3,延迟由更快的follower决定,而不是最慢的那个(比如有3个副本R1,R2和R3,三个响应时间分别是1ms,2ms,4ms,那么只会延迟2ms,而不是4ms)。
这些实现有丰富的算法,例如zookeeper的Zab,Raft,和Viewstamped Replication。我们了解到的与Kafka实际实施的最相似的学术算法是微软的PacificA 。感兴趣的同学可以戳下面的链接链接了解这些方法更多的细节:
- Zab:http://web.archive.org/web/20140602093727/http://www.stanford.edu/class/cs347/reading/zab.pdf
- Raft:https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf
- PacificA :http://research.microsoft.com/apps/pubs/default.aspx?id=66814
- Viewstamped Replication:http://pmg.csail.mit.edu/papers/vr-revisited.pdf
多数投票的缺点是它不能承受许多故障,让你选择不出leader。容忍一个故障,需要3份数据拷贝。容忍2个故障,需要5份数据拷贝。根据我们的经验,对于一个实际的系统来说,仅能承受1个故障是不够的,但是每个都写5次,需要5倍的磁盘空间,只有1/5的吞吐量,对大容量数据来说,又不太实用。这可能就是为什么法定人数仲裁算法更多的出现在分片的集群配置上,例如ZooKeeper。而很少出现在主要数据存储上。例如HDFS中,namenode的高可用特性是建立在基于多数投票算法,但是却没有用在数据本身(datanode)。ISR集合持久化保存在ZooKeeper中。正因为如此,ISR中的任何副本都可以被选举为leader。
kafka采用了一个略有不同的方法选择它的仲裁集合。没有使用大多数投票,kafka动态维护了一个跟上leader的ISR集合,只有这个集合里的成员能够参与选举。kafka分区的写操作,必须直到所有ISR接收到了这个请求,才能认为已提交。对于kafka用法而言,这是一个重要因素。因为有许多分区,并确保leader均衡是非常重要的。有了ISR模型和n+1副本,kafka的topic能承受n个副本故障而不会丢失已经提交的消息(只要还有1个ISR就能正常运行)。
对于很多我们希望处理的使用场景,我们认为trade-off是合理的。在实践中,为了容忍n个副本故障,大多数投票和ISR方法都要在提交消息之前等待相同数量副本确认(为了一个节点故障后集群还能正常工作,大多数法定人数需要3个副本和1个节点确认,而ISR方式需要2个副本和1个节点确认),提交能力与最慢服务器无关是大多数投票方法的优点。我们认为ISR方法能通过允许客户端选择是否阻塞消息提交(acks参数)来改善,由于所需的复制因子较低,额外的吞吐量和磁盘空间是值得的。
另一个重要的设计是kafka不需要崩溃的节点在所有它们的数据完整的情况下才恢复。这里有两个主要的问题,首先,磁盘错误是持久化数据系统操作时最常见的问题,它们经常不会留下完整的数据。其次,即使这不是问题,但是我们不需要为了我们一致性保证,每次写操作都fsync,这可能降低2~3个数量级的性能。我们的协议是允许一个副本重新加入ISR,但是在重新加入前,需要确保它必须完全再次同步所有数据,即使在崩溃时丢失的没有刷到磁盘上的数据。
Unclean leader election: What if they all die?
注意kafka保护数据不丢失是需要ISR中至少还有一个副本的前提。如果复制分区所有节点都死掉了,保护机制不再适用。
所以一个实际的系统当所有副本都死掉后需要做一些合理的事情,如果你恰好不幸碰到这种的事情,考虑将要发生的事情就非常重要了,这里有两种方法可以实施:
- 等待ISR中的一个副本活过来,并选择这个副本为leader(上帝保佑希望它有完整的数据)。
- 选择第一个活过来的副本为leader(不一定非要是ISR中的副本)。
这是非常简单的在可用性和一致性之间权衡,如果我们等待ISR中的副本,那么只要这些副本故障,我们就处于不可用状态。如果这些副本被破坏,或者它们的数据丢失,那么我们将永久不可用。如果一个非ISR副本活过来,我们也允许它成为leader,那么即使它不能保证有每一条已提交消息,但是所有副本恢复后还是得以它的日志为准。
kafka从0.11.0.0以后,默认选择第一种策略,宁愿等待一个一致性的副本。当然这个可以通过配置unclean.leader.election.enable
改变这个行为,为了支持可用性比一致性更重要的用户场景。
这个问题不止在kafka中才有,基于法定人数选举方案也会遇到这样的问题。例如,在一个多数投票方案中,如果大多数服务器永久性故障,那么你也必须做出抉择,如果选择高可用,那么数据就可能丢失。
Availability and Durability Guarantees
当消息写入kafka时,生产者能选择等待消息被0,1或者 all (-1)个副本确认。需要注意的是,被所有副本确认(acks=all)并不保证所有分配的副本收到消息,而是ISR都收到消息,这一点需要特别注意。
例如,如果一个topic创建时有两个副本,然后其中一个故障了,这时候ISR只有一个副本,那么即使指定acks=all,并且写入成功。然而,如果这剩下的一个副本如果也出现故障,数据仍然可能丢失。
确保分区最大可用性,这种行为可能不是那些把持久性(数据不能丢失)看的比可用性更重要的用户所需要的。因此,kafka提供了两个topic级别的配置,被用在消息持久性比可用性更重要的场景:
Disable unclean leader election - 关闭unclean leader选举(unclean.leader.election.enable)。如果所有副本不可用,那么分区也不可用,直到最后的leader再次恢复为止,宁愿不可用也不愿冒消息丢失的风险。
Specify a minimum ISR size - 指定ISR最小数(min.insync.replicas)。只有ISR数量超过这个最小数(可以等于),分区才能写入成功。为了防止消息只写入一个副本而导致数据丢失,这个配置只有在生产者配置acks=all的情况下才生效,从而保证消息将被ISR确认的最小数量(例如min.insync.replicas=2,那么至少需要2个ISR确认,生产者才会收到写入成功的响应)。这个配置提供了在持久性和可用性之间的权衡取舍。设置更大的min.insync.replicas,能更好的保证一致性,因为消息能被保证写入更多的副本中,从而减少数据丢失的可能性。不过,它也会降低可用性,当ISR数量下降到低于min.insync.replicas的阈值时,这个分区将不再能写入消息。
复制管理
上面关于复制日志的讨论实际上只涵盖了一个日志,即一个主题分区。然而kafka集群一般会管理成百上千分区,我们试图用round-robin方式平衡集群里的分区,避免高容量topic的所有分区集中在几个节点上(而不是尽可能分布在集群各个节点上)。同样的,我们试图均衡leader,以便每个节点成为其分区的比例份额的leader(假设集群有3个broker,总计有2个topic,每个topic有3个分区,3个副本,那么每个broker应该有2个leader)。
优化leader选举过程非常重要,因为这是分区不可用的关键窗口。一个幼稚的leader选举行为是leader故障后,每个分区都尝试选举。
相反,kafka的做法是选举某个broker为controller,controller发现broker级别故障后,承担改变这个故障broker上所有受影响分区的leader。这样做的结果是,我们能批量处理leader变更通知,即使很多分区我们的选举过程代价也很低,而且更快。如果controller故障,某个存活的broker将成为新的controller(这个竞争发生的概率很低,选举代价不大,毕竟不可能broker经常故障)。
8.日志压缩
说明:kafka中的日志压缩和我们平常通过tar或者zip命令将一个afei.log日志文件压缩为afei.log.tar.gz或者afei.log.zip是完全不一样的原理。
日志压缩确保kafka总是保留单个topic分区的日志数据中,每个消息key的最新的、最后已知的值。它对应的用户场景是,应用崩溃或者系统故障后的状态恢复,或者应用重启后,重新加载缓存。接下来让我们深入了解这些用户场景更多的细节,然后描述kafka日志压缩是如何工作的。
到目前为止,我们仅描述了更简单的数据保留方法,其中旧日志数据在固定的时间周期后或者当日志达到某个预定大小时被丢弃。这适用于时间事件数据,例如每个独立的日志记录。 但是,还有一类重要的数据流是对KEY的可变数据的更改日志(例如,对数据库表的更改)。
让我们讨论一个具体例子,我们有一个topic,包含了用户的邮箱地址信息,每次用户更新它们的邮箱地址,我们发送一个消息到这个topic上,并且这个消息的key是用户ID。现在用户ID为123的用户在一段时间内发送了下面这些消息,每条消息对应修改后新的邮箱地址(其他用户ID修改的消息暂时忽略):
123 => afei@sina.com
.
.
.
123 => afei@163.com
.
.
.
123 => afei@gmail.com
kafka的日志压缩机制能提供给我们一个更细粒度的保留机制,因此我们能保证保留下每个KEY的最后一次更新(上面的例子就是afei@gmail.com)。
通过这样做后,我们能保证日志包含每个KEY的最终值的完整快照,而不只是最近改变的KEY。这意味着下游消费者能恢复它们的状态,而并不需要保留所有改变的完整的日志。让我们从一些有用的用例开始,从而了解如何使用它。
- 数据库变更订阅。通常在多个数据系统中有一个数据集,并且这些系统中的某个系统就是一种数据库(例如关系型数据库系统,或者KV存储系统)。例如,你可能有一个数据库,一个缓存,一个搜索集群,一个hadoop集群。每一个数据库的变更,需要反映到缓存 ,搜索集群中,并最终反映到hadoop中。在只处理实时更新的情况下,只需要最近的日志即可。 但是,如果您希望能够重新加载缓存或恢复失败的搜索节点,则可能需要完整的数据集。
- 事件源。这是一种应用程序设计风格,它使用变更日志作为应用程序的主存储。
- 高可用日志(Journaling )。 通过记录对本地状态的改变,本地计算过程能达到容错的目的。如果它失败的话,另一个进程可以重新加载这些更改并继续执行。一个具体的例子是在流查询系统中处理计数,聚合和其他分组处理。
这些用例中,每种场景都需要处理变化的实时反馈数据,但是偶尔一台服务器崩溃,或者数据需要被重新加载,或者被重新处理,都需要完全加载。
一般的想法很简单,如果我们需要保留无限多的日志,我们记录上面这些场景每次变更,然后,我们捕获系统从一开始的每次状态。用这些完整的日志,我们通过应用日志中前N条能恢复到任意点。这样假象的完整的日志,对系统来说不是很符合实际,对于一个稳定的数据集来说,多次更新一条记录导致日志增长而不受限制。简单的日志保留机制将抛弃旧的更新,从而限制空间。但是日志不再能恢复当前状态,因为现在从日志的开始位置恢复,可能不再能重新创建当前的状态。
日志压缩是一个对于每条记录来说更细粒度的保留机制,而不是基于时间的粗粒度的保留机制。这种选择删除记录的想法,让我们保留每个KEY的最近更新,日志能保证至少有每个KEY的最新状态。
保留策略能针对每个topic设置,因此单个集群中,一些topic的保留策略是尺寸或者时间,而其他topic的保留策略可以是日志压缩。
这个功能的灵感来自一个LinkedIn的最古老,最成功的基础架构,一个叫做databus的数据库变更日志缓存服务。不像许多日志结构存储系统系统,kafka为了订阅和组织数据而生,以便能更快速的线性读和写。也不像databus,kafka扮演了一个真实的存储,所以即使在上游数据源无法重放的情况下它也很有用。
基本概念
下面是一张图片,显示了Kafka日志的逻辑结构以及每条消息的偏移量:
日志头部和传统的kafka日志是相同的。它有密集且有序的offset,并保留所有消息。日志压缩提供了一个处理日志尾部的选项。这张图片展示了一个已压缩尾部的日志。需要注意的是,日志尾部的消息保留了它们第一次写入时的原始的offset,并且这个offset从来不会改变。并且所有offset都是日志中有效的位置,即使消息已经被压缩处理过的offset。例如,如上图所示,36,37,38这三个offset是完全等价的位置,在这3个offset上的读都会返回offset为38位置的消息(即使36和37两个offset指定的日志已经被删除)。
压缩在后台完成,通过周期性的重新复制日志段。清理不会阻塞读,并且为了避免影响生产者和消费者,可以限制使用不超过配置的I / O吞吐量。
kafka压缩一个日志段的实际过程更像这样:
日志压缩提供什么保证?
日志压缩提供如下保证:
- 任何消费者都会看到所写的每条消息,这些消息将具有连续的偏移量。 topic的参数min.compaction.lag.ms可用于保证消息写入后必须经过的最小时间才能被压缩。即 它提供了每条消息保留不压缩状态的时间下限。
- 消息顺序总是不变,压缩不会重新对消息排序,只是删除一些消息。
- 消息的offset不会改变,offset永远是日志中位置的唯一标识符。
日志压缩细节
日志压缩被一个日志清理程序控制,它是一个后台线程池。它会重新拷贝日志段文件,并删除那么KEY已经在日志头部中出现的记录。每个压缩线程按照如下方式工作:
- 选择日志头与日志尾比率最高的日志;
- 创建一个简单的摘要,日志头部中的每个KEY的最后一个偏移量。
- 从头到尾重新拷贝日志,并删除那些在日志更后面的地方出现过的KEY。新的,干净的段被立即交换到日志中,因此所需的额外磁盘空间只是一个额外的日志段,而不是日志的完整副本。
- 日志头概要实际上只是一个空间紧凑的hash表,每个entry恰好用24个字节,这样做的结果就是一个8G的清理缓冲区,一次迭代清理大概能清理366G的日志头(假设每个消息是1K)。
配置日志清理
日志清理默认被开启,这将开启一个清理线程池。为了在一个特定的topic上打开日志清理,你可以增加一些属性:
log.cleanup.policy=compact
配置压缩(compact)日志的清理策略,还可以配置删除(delete)日志的清理策略,delete是默认策略;
log.cleaner.min.compaction.lag.ms=5000
其含义是,消息在日志中保持不压缩的最短时间,仅适用于正在被压缩的日志。如果没有配置,则除了最新的日志段(当前正被写入,即活动段),其他所有日志段都可以压缩。活动段不能被压缩,即使这个段里所有的日志比参数log.cleaner.min.compaction.lag.ms
配置的更老。