kafka生产者生产的消息需要存储在服务端,那么服务端就需要保证消息的健壮性,需要保证其线性扩展,负载均衡,故障容错等。那么kafka是怎么来做的喃?
分区:指消息按照分区分布在kafka集群中所有的节点上;复制:指分区都会有多个副本存储在不同的节点上。消息存储:指新的消息总是以追加的方式进行文件存储。分区可以做到线性扩展和负载均衡。复制可以做到故障容错。消息追加的方式进行信息存储可以提供很高的写效率。
分区 副本
消息按照主题分类,为了提高消息的并行处理能力,每个主题会有多个分区,为了保证消息的可用性,每个分区都会有多个副本。主题以分区的形式存储在多个代理节点上,ZK记录了主题和分区的对应关系,集群中每个代理节点都会管理多个主题的多个分区。
主题采用多个分区,可控制消息写往不同的节点,从而分散每个节点的压力。客户端以分区作为最小的处理单位,生产者将消息同时写入不同的节点,多个消费者可以同时读取不同节点的不同分区数据,加快消费消息,降低消息的延迟。
为了保证分区的可用性,采用副本机制为一个分区备份多个副本,一个分区只有一个主副本(Leader),多个备份副本(Follower)。主副本负责客户端的读写,备份副本负责同步主副本的数据。当主副本挂掉之后,在多个备份副本中选择一个作为主副本,继续为客户端提供读写服务。分区有两个重要的集合:AR(分区的所有副本集合)和ISR(和主副本正在同步的副本集合)。副本是真正存储在消息代理节点上,持有日志文件对象。客户端访问分区,先获取分区的主副本,然后获取主副本所在的消息代理节点编号,最后从消息代理节点读写主副本对应的日志文件。
将分区数据存储到日志文件上时,每个分区对应一个目录,目录下有多个日志分段。同一个目录下的所有日志分段都属于同一个分区。每个日志分段在物理上由一个数据文件和一个索引文件组成。数据文件存储的是消息的真正内容,索引文件存储的是数据文件的索引信息。为数据文件建立索引文件目的是更快的访问数据文件。生产者采用追加的方式将消息写入日志文件,顺序写盘性能很高,同样消费者也是顺序读盘,但是消费者在异常恢复情况下可能需要重新处理消息,这个时候可以利用索引文件重新定位到数据文件中的消息。
消息写入日志文件
每个分区都有一个日志对象管理分区的所有日志分段。
生产者在发送消息时,会在客户端将属于同一个分区的一批消息作为一个生产请求发送给服务端。java版本的生产者产生的消息内容是字节缓冲区(ByteBuffer)。
消息集
消息集中的每条消息都会被分配一个相对偏移量,每一批消息集中消息的相对偏移量都从0开始:第一批消息:[0,1,2,3]。第二批消息:[0,1,2]。
消息集中的每条消息有三个部分组成:偏移量,大小,消息内容。消息内容包含:键值,键值的长度,校验值等数据(Record)。
kafka服务端在存储消息时,会为每条消息都指定一个唯一的偏移量。同一个分区的所有日志分段中消息的偏移量从0开始不断递增,不同分区的偏移量直接没关系。这也就是kafka只能保证同一个分区的消息有序性,不能保证跨分区消息的有序性。
客户端创建消息集中每条消息的偏移量,都是相对于本批次消息集的偏移量,每批消息的偏移量都是从0开始,但是这个偏移量不能直接存储在日志文件中。在服务端要对此偏移量进行转换,计算消息的偏移量时,采用下一个偏移量(nextOffsetMetadata)的值来替换消息中的相对偏移量,这时消息中偏移量就是保存在文件中的绝对偏移量。
疑问:既然要在服务端进行替换偏移量,为什么还要在客户端进行设置,而不在服务端直接设置喃?
说明:客户端生产的消息传到服务端时都是转化成了二进制内容保存在字节缓冲区中,假设我们在客户端没有设置偏移量写入字节缓冲区中,那么在服务端存储消息时,就需要在字节缓冲区每条消息前面添加偏移量,这样就需要修改字节缓冲区的大小。不能直接使用原来的缓冲区。假如我们在客户端设置了偏移量,消息格式是固定了。在服务端存储消息时,直接修改字节缓冲区中每条消息的偏移量的值就行了,其他数据内容不变,字节缓冲区的大小也不会发生变化。
服务端将每个分区的消息追加到日志中是以日志分段为单位的。日志分段中数据文件的大小的阀值1G,数据文件中存储的消息达到阀值后会创建一个新的日志分段文件来存储信息,分区的消息总是追加到最新的日志分段中,也就是说,一个分区的日志文件,在任何时刻都只有一个活动的日志分段。每个日志分段都有一个基准偏移量,在一个日志分段中是固定值,用它来计算出每条消息在当前这个日志分段中的绝对偏移量,最后把消息写到日志分段中。更新日志的下一个偏移量的值(用来设置下一个消息的绝对偏移量),满足条件时调用flush()把消息刷写到磁盘。
下一个偏移量(nextOffsetMetadata)声明类型是volatile,volatile类型的变量被修改时,其他所有使用到此变量的线程都能立即看到。服务端为每条消息指定绝对偏移量,会用nextOffsetMetadata的值作为起始偏移量,将消息写入日志分段中,获取到这批消息中最后一条消息的偏移量加上一后更新nextOffsetMetadata。消费者或者备份副本会根据nextOffsetMetadata最新的值拉取到新写入的消息。
为消息集分配绝对偏移量时(更新每条消息的偏移量数据:offset),以nextOffsetMetadata的偏移量作为起始偏移量。分配完后还要更新nextOffsetMetadata的偏移量值,因此获取nextOffsetMetadata的偏移量值并加一是一个原子操作。
消息的大小和消息内容都不变动,如何在字节缓冲区中定位到每条消息的偏移量所在的位置:先读取出消息大小的值,然后计算出下一条消息的起始偏移量,使用字节缓冲区提供的定位方法(position())直接定位到下一条消息的起始位置。并不需要按照顺序完整的读取每条消息的实际内容,这样代价太大。 每条消息的长度:8个字节+4个字节+消息大小。
日志分段
服务端处理每批追加到日志分段中的消息集,都是以nextOffsetMetadata作为起始的绝对偏移量。同一个分区的所有日志分段中,所有消息的偏移量都是递增的。
消息代理节点上的一个主题分区(TopicPartition)对应一个日志(Log)。每个日志有多个日志分段(LogSegment),一个日志管理该分区的所有日志分段。
多个日志分段中,只有一个活动日志分段(activeSegment)来存储当前接收的消息集。其中有几个重要的方法和变量:
1,activeSegment():获取segments(所有日志分段集合)中最后一个元素,作为日志最新的活动分段。如果有新的日志分段产生就会加入到segments的最后一个。
2,nextOffsetMetadata:下一个偏移量元数据,LogOffsetMetadata对象的实例,nextOffsetMetadata的构造方法中的值来源于活动日志分段:下一个偏移量的值(nextOffset),基准偏移量(baseOffset),活动日志分段的大小(size)。
3,logEndOffset:日志的最新偏移量表示下一条消息的偏移量,它的值是nextOffsetMetadata.nextOffset的值。实际上logEndOffset就是当前活动日志分段的下一个偏移量的值。
客户端对消息的读写操作会用到日志偏移量元数据(LogOffsetMetadata),写入消息集到日志,下一个偏移量元数据(nextOffsetMetadata)中的消息偏移量(nextOffset)会作为消息集的起始偏移量;从日志中读取消息时,不能超过日志结束偏移量元数据(logEndOffsetMetadata)中的结束偏移量(logEndOffset)或日志最高水位偏移量元数据(highWatermarkMetadata)的最高水位(highWatermark)。
日志偏移量元数据(LogOffsetMetadata)包含:消息偏移量(messageOffset),日志分段的基准偏移量(segmentBaseOffset),消息在日志分段中的物理位置(relativePositionInSement)。
下一个偏移量元数据(nextOffsetMetadata),日志结束偏移量元数据(logEndOffsetMetadata),日志最高水位偏移量元数据(highWatermarkMetadata)都继承自日志偏移量元数据(LogOffsetMetadata)。
nextOffsetMetadata其实和logEndOffsetMetadata的值是一样的。但是他们面向的使用对象不一样,nextOffsetMetadata是写入时使用面向生产者;logEndOffsetMetadata是读取时,但是是备份副本使用;highWatermarkMetadata读取时,但是是消费者读取使用。
追加消息集到活动日志分段,如果当前活动日志分段放不下新追加的消息集,日志会采用滚动方式创建一个新的日志分段,并将消息集追加到新的日志分段。activeSegment指向新创建的日志分段。
达到以下条件会创建新的日志分段
1,追加的消息集的大小加上原有消息集的大小超过了日志分段的阀值,也就是放不下新追加的消息集了。
2,离上次创建日志分段时间到达一定需要滚动创建的时间。
3,索引文件满了
新创建的日志分段的基准偏移量的值是nextOffsetMetadata中的消息偏移量的值,也是当前活动日志分段的下一个偏移量的值。
每个日志分段由数据文件和索引文件组成,数据文件保存消息集的具体内容,索引文件保存了消息偏移量到物理位置的索引。追加一批消息集到日志分段,数据文件实时保存消息集内容,而间隔一定的字节大小才会写入一个索引条目到索引文件中,索引文件的目的就是:快速定位指定偏移量消息在数据文件中的物理位置。基本思路是:建立消息绝对偏移量到消息在数据文件中的物理位置的映射关系。
索引文件存储的形式和特点:
1,不会为每条消息都建立索引。稀疏索引
2,索引条目的偏移量存储的是相对于基准偏移量的相对偏移量,不是消息的绝对偏移量。
3,索引条目的相对偏移量和物理位置各占4字节,即一个索引条目占用8个字节(消息集中的消息绝对偏移量占用8个字节)。
4,索引条目中的相对偏移量是有序的,查询指定偏移量时,使用二分法查找可以快速确定偏移量的位置。
5,如果指定的偏移量在索引文件中不存在,可以找小于指定偏移量的最大偏移量。
6,稀疏索引可以使用内存映射方式,将整个索引文件放入内存中,加快偏移量的查询。
总结:
1,一个日志由多个日志分段组成,日志管理了所有的日志分段。
2,日志用segments保存每个日志分段的基准偏移量到日志分段的映射关系。
3,日志分段的基准偏移量是分区级别的绝对偏移量。
4,日志分段中第一条消息的绝对偏移量等于日志分段的基准偏移量。
5,每个日志分段都由一个索引文件和一个数据文件组成。
6,日志分段的数据文件和索引文件的文件名称以基准偏移量命名。
7,数据文件保存每天消息的格式是:消息的绝对偏移量,消息的大小,消息的内容。
8,索引文件保存消息偏移量和消息在数据文件中的物理位置。
9,索引文件中索引条目的键存储的值是:消息的绝对偏移量减去基准偏移量。
10,索引文件通过内存映射方式,将整个索引文件加载到内存中,加快文件的读取。
读取日志
客户端拉取分区主副本的消息集,一定会指定拉取偏移量,发送拉取请求给服务端。服务端处理客户端的拉取请求,会返回从客户端指定偏移量开始读取的消息集,当然不能有多少消息集就返回多少消息集,客户端会指定拉取的消息量,默认大小为1M(max.partition.fetch.bytes),而日志分段对应的数据文件大小默认是1G,所以服务端在读取消息集的时候只需要选择其中一个分段,就可以满足客户端的一次拉取请求。
选择日志分段要结合客户端指定的偏移量和日志分段的基准偏移量,因为日志分段的基准偏移量是日志分段中第一条消息的偏移量,所以可以根据客户端指定的拉取偏移量从所有基准偏移量中选择出小于等于拉取偏移量的最大值的日志分段,这样的选择叫做floor。
读日志分段应该先读取索引文件再读取数据文件:根据起始偏移量(startOffset)到读取索引文件中对应的最近的物理位置(startPosition),根据起始位置直接定位到数据文件,开始读取数据文件消息,最多只能读到数据文件的结束位置(maxPosition)。以下图片来自于《Kafka技术内幕:图文详解Kafka源码设计与实现》
由于索引文件采用稀疏索引方式存储,所有给定一个偏移量到索引文件中去查找不一定能找到对应的索引条目。但是又因为索引文件中所有索引条目的偏移量都是递增的,可以找到离目标偏移量最近的索引条目偏移量(小于目标偏移量的最大值,和找日志分段类似)
从索引文件到数据文件的步骤:
1,起始偏移量为350,找到索引文件中偏移量为345的索引条目,对应的物理位置是328.
2,根据物理位置直接定位到数据文件的328文件位置。
3,读取每条消息的偏移量,但是不读取消息内容。
4,步骤3最终会找到偏移量为350的消息,得到物理位置448。
5,定位到数据文件的448位置,开始真正的读取起始偏移量为350的消息内容。
追加消息时,索引条目的偏移量是基于日志分段基准偏移量的相对偏移盘。 由于客户端读取消息给的是绝对偏移量,因此在查询索引文件之前,要先将绝对偏移量减去日志分段的基准偏移盘,转换为相对偏移量。 另外,数据文件每条消息的偏移量存储的是绝对偏移量,查找索引文件返回值也应该是绝对偏移量 。 但索引条目存储的是相对偏移量,最后返回的偏移量还要再加上基准偏移量 。
根据给定的起始偏移量,先调用索引文件的lookup()查询方法,获得离起始偏移量最近的物理位置,然后再调用数据文件的searchFor()方法,从指定的物理位置开始读取每条消息,直到找到起始偏移量对应的物理位置。
对索引文件进行二分查找会返回两个信息:偏移量和物理位置。偏移量不一定和起始偏移量对应,物理位置也不会和起始位置对应。还需要搜索数据文件才能确定起始偏移量对应的起始位置。索引文件中每个索引条目占用8字节,索引条目中偏移量和物理位置各向占用4字节。在查找过程中,需要读取出偏移量的值,然后和目标偏移量进行比较。
由于消息集中的每条消息都是由:绝对偏移量,消息大小,消息内容组成。要查询数据文件中的消息偏移量等于目标偏移量的消息,我们只需读取每条消息的前两部分数据,因为它们占用的字节缓冲区是固定的12个字节(8个字节的绝对偏移量+4个字节的消息大小)。这种方式不会读取消息内容,跳跃式的查找。读出绝对偏移量是为了和目标偏移量进行比较,读出消息大小是为了当绝对偏移量是为了和目标偏移量不相等的时候,需要跳到下一个消息,再进行比较。
文件消息集是数据文件的实现类,文件消息集的读取方法根据起始位置和读取大小,创建一个新的文件消息集视图,每次调用读取方法都会生成一个新的文件消息集对象。如果客户端每次拉取消息都在同一个日志分段中,数据文件是同一个,说明同一个文件消息集会调用多次读取方法,每次都会创建一个新的文件消息集,但所有的文件消息集都共用同一个文件和文件通道。和日志分段相关的文件消息集叫做原始文件消息集,调用原始文件消息集生成的新文件消息集叫做文件消息集视图。两种消息集有关的操作:
1,生产者生产的字节缓冲区消息集会追加到日志分段对应的原始文件消息集。
2,原始文件消息集会将自己缓冲区消息集写入到数据文件底层的文件通道中。
3,服务端处理客户端的拉取请求,读取日志分段,调用原始文件消息集的读取方法。
4,原始文件消息集读取方法会生成一个局部的文件消息集视图,和数据文件底层的文件通道相关。
5,局部文件消息集视图发送拉取响应结果给客户端,会将文件通道的字节直接传输给网络通道。
全局的可变的原始文件消息集可以接受消息集的追加,会在每次处理客户端的拉取请求时,生成不可变的局部的文件消息集视图。文件消息集视图的消息集表示一次拉取请求的分区数据,最终被封装到拉取响应中,通过服务端的网络通道发送给客户端。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现