概括下本文的内容:
我们可以使用 Logstash 的持久化队列技术尽量保证数据可靠传输至 output;
适用场景:传输可靠性要求稍低的场景下(和 Kafka 类比),替换架构中的 Kafka 或者加固 Logstash 本身的可靠性,因为即使 queue.checkpoint.writes:1,也有可能因为磁盘故障(检查点文件 和 queue 文件同时损坏)丢至多 1 条数据,核心的问题是在于没有多副本和选举相关的实现;
Deliver 策略:at-least-once;
默认情况下,Logstash在 pipeline stages(inputs → pipeline workers)之间使用内存有界队列来缓冲事件。 这些内存中队列的大小是固定的,不可配置。 如果 Logstash 遇到临时机器故障,则内存中队列的内容将丢失。 临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。
为了防止异常终止期间的数据丢失,Logstash 具有持久性队列功能,将消息队列存储在磁盘上。 持久队列在 Logstash 中提供数据的持久性。
持久队列对于需要大型缓冲区的 Logstash 部署也很有用。 换言之,可以启用持久性队列来缓存磁盘上的事件而无需使用较重的组件(如 Redis,RabbitMQ 或 Apache Kafka)来实现缓冲的发布订户模型。
总而言之,启用持久队列的好处如下:
- 处理突发事件,而不需要像 Redis 或 Apache Kafka 这样的外部缓冲机制;
- 在正常关机期间以及 Logstash 异常终止时,提供 at-least-once 传输保证,防止消息丢失。 如果 Logstash 在事件传递时(我理解是事务执行过程中,in-flight)重新启动,Logstash 将尝试传递存储在持久性队列中的消息,直到传送成功至少一次。
持久队列的限制
以下是持久队列功能未解决的问题:
- 不使用 request-response 协议的 input 插件无法避免数据丢失。 例如:tcp,udp,zeromq push + pull和许多其他输入没有机制来确认 receipt 的发送方。 具有确认功能的插件(如 beats 和 http)受到此队列的良好保护。
- 它不会处理永久性的机器故障,如磁盘损坏,磁盘故障和机器丢失,持续到磁盘的数据不会被复制。
持久队列如何工作
队列位于同一进程的 input 和 filter 阶段之间:
input → queue → filter + output
当 input 有事件准备好处理时,它将事件写入队列。当写入队列成功时, input 可以向其数据源发送 ack。
Logstash 只会在 filter 和 output 完成事件处理后,确认事件已完成。队列保留 pipeline 已处理的事件的记录。事件被记录为已处理(acked),当且仅当事件已由 Logstash pipeline 完全处理。
Ack 这意味着事件已由所有配置的 filter 和 output 处理。例如,如果只有一个到 Elasticsearch的输出,当 Elasticsearch output 已成功将此事件发送到 Elasticsearch 时,事件将被 ack。
在正常关机(Kill 或 SIGTERM)期间,Logstash 将停止从队列中读取,并完成由 filter 和 output 处理的 in-flight 事件。重新启动后,Logstash 将恢复处理持久性队列中的事件以及从 input 接受新事件。
如果 Logstash 异常终止,任何 in-flight 事件将不会被 ack,并且当 Logstash 重新启动时将被 filter 和 output 重新处理。 Logstash 分批处理事件,因此对于任何给定的批处理,有可能已经成功完成了该批次,但是在发生异常终止时不能被记录为已确认,所以可以解释为什么会有重复发送。
如何配置持久化队列
要配置持久性队列,可以在Logstash设置文件中指定以下选项:
queue.type
:指定持久化以启用持久性队列。默认情况下,持久队列被禁用(默认:queue.type:memory)。
path.queue
:数据文件将被存储的目录路径。默认情况下,文件存储在path.data/queue中。
queue.page_capacity
:队列页面的最大大小(以字节为单位)。队列数据由仅附加文件称为“页面”组成。默认大小为250MB。更改此值不太可能具有性能优势。
queue.max_events
:队列中允许的最大事件数。默认值为0(无限制)。该值在内部用于Logstash测试。
queue.max_bytes
:队列的总容量,以字节为单位。默认值为1024MB(1GB)。确保磁盘驱动器的容量大于此处指定的值。
如果同时指定了queue.max_events
和queue.max_bytes
,则Logstash将使用首先达到的条件。。
还可以指定控制检查点文件何时更新的选项(queue.checkpoint.acks
,queue.checkpoint.writes
)。
示例配置:
queue.type:persisted
queue.max_bytes:4gb
处理 Back Pressure
当队列已满时,Logstash 会对 input 端施加压力,以阻止流入 Logstash 的数据。这种机制有助于 Logstash 在 input 阶段控制数据流量,而不会向比如 Elasticsearch 类似的 output 端疯狂输出。
使用queue.max_bytes
设置配置磁盘上队列的总容量。以下示例将队列的总容量设置为8gb:
queue.type:persisted
queue.max_bytes:8gb
指定这些设置后,Logstash 将缓存磁盘上的事件,直到队列的大小达到8gb。当队列充满 unACKed 的事件,并且已达到大小限制时,Logstash 将不再接受新的事件。
每个 input 单独处理 Back Pressure。例如,当 beats input 遇到 back pressure 时,它不再接受新连接并等待,直到队列有空间来接受更多的事件。filter 和 output 阶段完成处理队列中的现有事件并确认它们后,Logstash 会自动开始接受新的事件。
控制 Durability
Durability 是存储写入的一种特性,可确保数据在写入后可用。
当启用持久性队列功能时,Logstash 会将事件存储在磁盘上。 Logstash以称为检查点的机制提交到磁盘。
为了讨论 Durability,我们需要介绍一些关于如何实现持久队列的细节。
首先,队列本身是一组 pages。有两种 page:head page 和 tail page。head page 是新事件写入的地方,只有一个。当 head page 达到上限(参见queue.page_capacity
)时,它将成为一个 tail page,并创建一个新的 page。tail page 是不修改的,head page 是只能追加内容的。其次,队列将记录自己的详细信息(pages,ack 确认信息等)记录在一个名为checkpoint 文件的单独文件中。
记录检查点时,Logstash将:
- 在 head page 上调用 fsync。
- 原子地写入队列的当前状态至磁盘。
以下设置可用于调整持久性:
-
queue.checkpoint.writes
:Logstash 在写入指定条数事件后,进行 checkpoint。目前,一个事件计为一个写入,但这可能会在将来的版本中更改。 -
queue.checkpoint.acks
:Logstash 在指定条数事件 acked 后,进行 checkpoint。此配置用来控制 Logstash 处理(filter + output)环节的 Durability。
磁盘写入具有资源成本。调整上述值或更高值将会调整 durability。例如,如果您想要所有 input 事件具备最高 durability,可以设置queue.checkpoint.writes:1
。
检查点的过程是原子的,这意味着如果成功,将保存文件的任何更新。
如果 Logstash 终止,或者如果存在硬件级别故障,那么在持久性队列中缓存但尚未检查点的任何数据都将丢失。为了避免这种可能性,您可以设置 queue.checkpoint.writes:1
,但请记住,此设置会严重影响性能。
磁盘垃圾回收
在磁盘上,队列存储为一组 page,其中每个 page 是一个文件。 每个 page 最多可以是 queue.page_capacity
的大小。 在该 page 中的所有事件都被确认之后,page 被删除(垃圾回收)。 如果较旧的 page 至少有一个尚未确认的事件,则整个 page 将保留在磁盘上,直到该 page 中的所有事件成功处理。 包含未处理事件的每个 page 将对queue.max_bytes
字节大小进行计数。