原文地址:https://kafka.apache.org/0101/documentation.html#compaction
日志压缩确保Kafka会为一个Topic分区数据日志中保留至少message key的最后一个值。它解决了应用crash或系统故障或应用在操作期间重启来重新加载缓存的场景。让我们深入到细节中解释日志压缩是如何工作的。
到目前为止,我们只说明了在一断时间或达到特定大小的时候丢弃旧日志的简单方法。这适用于像日志这样每一条数据都是独立数据的情况。但是重要类别的数据是根据key处理的数据(例如DB中表的变更数据)。
让我们来讨论这样一个具体的流的例子。一个Topic包含了用户email address信息;每一次用户变更邮箱地址,我们都像这个topic发送一条消息,使用用户ID作为primay key。现在我们已经为用户ID为123的用户发送了一些消息,每条消息包含了email address的变更:
123 => bill@microsoft.com
.
.
.
123 => bill@gatesfoundation.org
.
.
.
123 => bill@gmail.com
日志压缩为我们提供了更精细的保留机制,至少保存每个key最后一个变更(如123 => bill@gmail.com)。这样做我们确保了这个日志包含了所有key最后一个值的快照。这样Consumer可以重建状态而不需要保留完成的变更日志。
让我们列一些日志压缩有用的场景,然后看他是如何被使用的。
- 1.DB变更订阅。这是很常见的,一个数据在多个数据系统中,而且其中一个系统是数据库类型的(如RDBMS或KV系统)。例如可能有一个数据库,一个户缓存系统,一个搜索集群,一个Hadoop集群。DB的任何一个变更需要反映到缓存、搜索集群,最终保存到Hadoop中。在这个场景中,你只需要实时系统最新的更新日志。但是如果需要重新加载缓存或恢复宕机的检索节点,就需要完整的数据。
- 2.事件源。这是一种应用设计风格,它将查询处理和应用程序设计结合到一起,并使用日志作为程序的主要存储。
- 3.高可用日志。一个本地集成程序可以通过变更日志来做到容错,这样另一个程序能够在当前程序故障时继续处理。例如,像流数据查询例子,如计数,汇总或其他的分组操作。实时系统框架如Samza,就是为了达到这个目的使用这个特性的。
在这些场景中,主要处理实时的变更,但有时需要重新加载或重新处理时,需要加载所有数据。日志压缩允许使用相同的Topic来支持这些场景,这种日志使用风格在后续的内容中会更详细的描述。
想法很简单,我们有无限的日志,以上每种情况记录变更日志,我们从一开始就捕获每一次变更。使用这个完整的日志,我们可以通过回放日志来恢复到任何一个时间点的状态。这种假设的情况下,完整的日志是不实际的,对于那些每一行记录会变更多次的系统,即使数据集很小,日志也会无限的增长下去。丢弃旧日志的简单操作可以限制空间的增长,但是无法重建状态——因为旧的日志被丢弃,可能一部分记录的状态会无法重建(这写记录所有的状态变更都在就日志中)。
日志压缩机制是更细粒度的,每个记录都保留的机制,而不是基于时间的粗粒度。这个想法是选择性的删除哪些有更新的变更的记录的日志。这样最终日志至少包含每个key的记录的最后一个状态。
这个策略可以为每个Topic设置,这样一个集群中,可以一部分Topic通过时间和大小保留日志,另外一些可以通过压缩保留。
这个功能的灵感来自于LinkedIn的最古老且最成功的基础设置——一个称为Databus的数据库变更日志缓存系统。不想大多数的日志存储系统,Kafka为了订阅而量身打造,用于线性的快速读写。和Databus不同,Kafka作为真实的存储,压缩日志是非常有用的,在上游数据源不能重放的情况下。
Log Compaction Basics
这里是一个展示Kafka日志的逻辑结构的图(每条消息包含了一个offset):
Log head中包含传统的Kafka日志。它包含了连续的offset和所有的消息。日志压缩增加了处理tail Log的选项。上图展示了日志压缩的的Log tail的情况。tail中的消息保存了初次写入时的offset。即使该offset的消息被压缩,所有offset仍然在日志中是有效的。在这个场景中,无法区分和下一个出现的更高offset的位置。如上面的例子中,36、37、38是属于相同位置的,从他们开始读取日志都将从38开始。
压缩允许删除。一条消息伴随着空的值被认为从日志中删除。这个删除标记将会引起所有之前拥有相同key的消息被移除(包括拥有key相同的新消息),但是删除标记比较特殊,它将在一定周期后被从日志中删除来释放空间。这个时间点被称为“delete retention point”。
压缩操作通过在后台周期性的拷贝日志段来完成。清除操作不会阻塞读取,并且可以被配置不超过一定IO吞吐来避免影响Producer和Consumer。实际的日志段压缩过程有点像如下:
What guarantees does log compaction provide?
日志压缩提供了如下的保证:
- 1.所有跟上消费的Consumer能消费到所有写入的消息;这些消息有连续的序列号。Topic的min.compaction.lag.ms可以用于保证消息写入多久后才会被压缩。这限制了一条消息在Log Head中的最短存在时间。
- 2.消息的顺序会被保留。压缩不会重排序消息,只是移除其中一部分。
- 3.消息的Offset不会变更。这是消息在日志中的永久标志。
- 4.任何从头开始处理日志的Consumer至少会拿到每个key的最终状态。另外,只要Consumer在小于Topic的delete.retention.ms设置(默认24小时)的时间段内到达Log head,将会看到所有删除记录的所有删除标记。换句话说,因为移除删除标记和读取是同事发生的,Consumer可能会因为落后超过delete.retention.ms而导致错过删除标记。
Log Compaction Details
日志压缩由Log Cleaner执行,后台线程池重新拷贝日志段,移除那些key存在于Log Head中的记录。每个压缩线程如下工作:
- 1.选择Log Head相对于Log Head在日志中占更高比例的日志
- 2.创建Log Head中每个Key最后一个offset的摘要
- 3.从头到尾的拷贝日志,并删除之后日志终于到相同key的记录。新的、干净的日志将会立即被交到到日志中,所以只需要一个额外的日志段空间
- 4.Log Head的摘要实际上是一个空间紧凑的哈希表。每个条目使用24个字节。所以如果有8G的整理缓冲区, 则能迭代处理大约366G的日志头部(假设消息大小为1k)。
Configuring The Log Cleaner
Log Cleaner默认启用。这会启动清理的线程池。如果要开始特定Topic的清理功能,可以开启特定的属性:
log.cleanup.policy=compact
这个可以通过创建Topic时配置或者之后使用Topic命令实现。
Log Cleaner可以配置保留最小的不压缩的日志头。可以通过配置压缩的延迟时间:
log.cleaner.min.compaction.lag.ms
这可以用于保证消息比在被压缩的消息大一段时间。如果没有设置,除了最后一个日志外,所有的日志都会被压缩。当前写入的自如端不会被压缩,即使所有的消息都落后于比配置的最小压缩时间。
更多的配置在这里