文件目录
Kafka中的消息是存储在磁盘上的,一个分区副本对应一个日志(Log)。为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment ,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log和LogSegnient 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以.txnindex ”为后缀的事务索引文件),下图为topic、partition、副本、log和logSegment之间的关系。
日志分段
虽然一个log被拆为多个分段,但只有最后一个LogSegment(当前活跃的日志分段)才能执行写入操作,在此之前所有的LogSegment都不能写入数据。当满足以下其中任一条件会创建新的LogSegment。
当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值,默认1GB。
当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于log.roll.m 或log.roll.hours 参数配置的值,默认为7天。
偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size .max.bytes 配置的值,默认为10M。
追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integr.MAX_VALUE。
在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,默认大小为1GB。当下次索引切分时才会设置为实际大小。也就是说,之前的segment都是实际大小,活跃segment大小为1G。
日志索引
索引的主要目的是提高查找的效率。
Kafka采用稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。而是每当写入一定量(由 broker 端参数 log.index. interval.bytes 指定,默认4KB )的消息时,索引文件会增加一个索引项。
偏移量索引
一条偏移量索引包含两部分数据,如图:- relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,当前索引文件的文件名即为baseOffset。
- position:物理地址,也就是消息在日志分段文件中对应的物理位置
消息查找过程
以上是比较简单的情况,如下图所示,如果要查找要查找偏移268的消息,那么应该怎么办呢?
那么如何查找 baseOffset 25的日志分段的呢?Kafka使用了跳跃表的结构 Kafka 的每个日志对象中使用了ConcurrentSkipListMap来保存各个日志分段,每个日志分段的 baseOffset 作为 key ,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
时间戳索引
时间戳索引也是包含两部分数据,如图:- timestamp 当前日志分段最大的时间戳。
- relativeOffset :时间戳所对应的消息的相对偏移量,也就是偏移量索引中偏移量。
间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp ,否则不予追加。
消息查找过程
- targetTimeStamp和每个日志分段中的最大时间戳对比,直到找到不小于 targetTimeStam所对应的日志分段。
注:日志分段中的最大时间戳的计算是先查询该日志分段所对应的时间戳索引文件,找到最后一条索引项,若最后一条索引项的时间戳字段值大于0 ,则取其值,否则取该日志分段的最近修改时间。 - 找到相应的日志分段之后,在时间戳索引文件中使用二分查找算法查找到不大于 targetTimeStamp 最大索引项,即[1526384718283, 28],如此便找到了相对偏移量28。
- 在偏移量索引文件中使用二分算法查找到不大于 28 的最大索引工页,即[26,838]。
- 从1中找到日志分段文件中的838的物理位置开始查找不小于targetTimeStamp 的消息。
日志清理
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka提供了两种日志清理策略。
- 日志删除:按照一定的保留策略直接删除不符合条件的日志分段,默认该策略。
- 日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
日志删除
kafka有专门的任务来周期性删除不符合条件的日志分段文件,删除策略主要以下有3种。
基于时间
broker端可通过参数设置日志的最大保留时间,默认7天。定时任务会查看每个分段的最大时间戳(计算逻辑同上),若最大时间戳距当前时间超过7天,则需要删除。
删除日志分段时, 首先会先从跳跃表中移除待删除的日志分段,保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上".delete"的后缀。最后由专门的定时任务来删除以".delete"为后缀的文件。-
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阔值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments)。
注意这里的日志的大小是指所有的segment的总和,不是单个segment。
首先计算日志文件的总大小和设定阈值的差值,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段,放入集合deletableSegments中 。之后进行删除,删除过程同1。 -
基于日志起始偏移量
一般情况下,日志文件的起始偏移logStartOffset等于第1个日志分段的baseOffset ,但logStartOffset是可以被修改的。
该策略会判断某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于 logStartOffset ,若是,则将其放入deletableSegments中。如下图所示。
后面的过程与1/2中相同,不再赘述。
日志压缩
对于有相同key的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志压缩功能,Kafka会定期将相同key的消息进行合井,只保留最新的value值。