1.消息的保存路径
消息发送端发送消息到 broker 上以后,消息是如何持久化的呢?那么接下来去分析下消息的存储
首先我们需要了解的是,kafka 是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,Log 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的明明规则是<topic_name>_<partition_id>比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
2.多个分区在集群中的分配
如果我们对于一个 topic,在集群中创建多个 partition,那么 partition 是如何分布的呢?
1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上
了解到这里的时候,大家再结合前面讲的消息分发策略,就应该能明白消息发送到 broker 上,消息会保存到哪个分区中并且消费端应该消费哪些分区的数
3.消息写入的性能
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。即使是这样,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈所以 kafka还有一个性能策略
零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,
但实际上经历了很多步骤
▪ 操作系统将数据从磁盘读入到内核空间的页缓存
▪ 应用程序将数据从内核空间读入到用户空间缓存中
▪ 应用程序将数据写回到内核空间到 socket 缓存中
▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有进行变化,仅仅是从磁盘复制到网卡缓冲区。
通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
4.消息的文件存储机制
前面我们知道了一个 topic 的多个 partition 在物理磁盘上的保存路径,那么我们再来分析日志的存储方式。通过如下命令找到对应 partition 下的日志内容
[root@localhost ~]# ls /tmp/kafka-logs/firstTopic-1/
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
leader-epoch-checkpoint
kafka 是通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是一个逻辑上的概念,一个 LogSegment 对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。那么这个 LogSegment 是什么呢?
kafka 是通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是一个逻辑上的概念,一个 LogSegment 对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。那么这个 LogSegment 是什么呢?
LogSegment
假设 kafka 以 partition 为最小存储单位,那么我们可以想象当 kafka producer 不断发送消息,必然会引起 partition文件的无线扩张,这样对于消息文件的维护以及被消费的消息的清理带来非常大的挑战,所以 kafka 以 segment 为单位又把 partition 进行细分。每个 partition 相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个 segment 文件中的消息不一定相等),这种特性方便已经被消费的消息的清理,提高磁盘的利用率。
log.segment.bytes=107370 (设置分段大小),默认是1gb,我们把这个值调小以后,可以看到日志分段的效果
抽取其中 3 个分段来进行分析
segment file 由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.
segment 文件命名规则:partion 全局的第一个 segment,从 0 开始,后续每个 segment 文件名为上一个 segment,文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充
查看segment文件命名
通过下面这条命令可以看到 kafka 消息日志的内容
sh kafka-run-class.sh kafka.tools.DumpLogSegments -- files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
输出结果:
offset: 5376 position: 102124 CreateTime: 1531477349287 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5376
第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 00000000000000005376.log。对应的 index 为 00000000000000005376.index
segment 中 index 和 log 的对应
从所有分段中,找一个分段进行分析为了提高查找消息的性能,为每一个日志文件添加 2 个索引索引文件:OffsetIndex 和 TimeIndex,分别对应 *.index以及 *.timeindex, ,TimeIndex的索引文件格式:它是映射时间戳和相对offset
查 看 索 引 内 容 :
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
如图所示,index 中存储了索引以及物理偏移量。 log 存储了消息的内容。索引文件的元数据执行对应数据文件中message 的物理偏移地址。举个简单的案例来说,以[4053,80899]为例,在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899. position 是ByteBuffer 的指针位置
在 partition 中如何通过 offset 查找 message
查找的算法是
根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文
找到索引文件后,根据 offset 进行定位,找到索引文件中的符合范围的索引。(kafka 采用稀疏索引的方式来提高查找性能)
得到 position 以后,再到对应的 log 文件中,从 position出开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回
Log 文件的消息内容
前面我们通过 kafka 提供的命令,可以查看二进制的日志文件信息,一条消息,会包含很多的字段。
offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize: -1 valuesize: 12
magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional:
false header
offset 和 position 这两个前面已经讲过了、createTime 表示创建时间、keysize 和 valuesize 表示 key 和 value 的大小、 compresscodec 表示压缩编码、payload:表示消息的具体内容
5.日志的清除策略以及压缩策略
日志清除策略
前面提到过,日志的分段存储,一方面能够减少单个文件
内容的大小,另一方面,方便 kafka 进行日志清理。日志的清理策略有两个
- 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
- 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。kafka会启动一个后台线程,定期检查是否存在可以删除的消息
通过在启动配置文件中的log.retention.bytes 和 log.retention.hours
这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天
-
日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner 线程池,定期将相同的 key 进行合并
,只保留最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动启动Cleaner 线程池,定期将相同的 key 进行合并,只保留最新的 value 值。日志的压缩原理是