Apache Kafka 是一个分布式的流数据平台,最初由LinkedIn公司开发,后成为 Apache 基金会的开源项目。Kafka 主要用于解决高吞吐量、低延迟的实时数据传输和处理问题,特别适用于日志收集、流式处理、事件驱动等场景。
特点
kafka有如下特点:
- 高吞吐量:Kafka每秒可以处理数十万条消息的写入和读取。
- 分布式:Kafka采用分布式架构,可以将消息分布在多个节点上进行存储和处理。
- 持久化:Kafka使用磁盘持久化消息,可以将消息持久保存在磁盘上,即使消费者不在线也不会丢失消息。
- 发布-订阅模型:Kafka采用发布-订阅模型,允许生产者将消息发布到一个或多个主题(Topic),而消费者可以订阅一个或多个主题来接收消息。
- 分区和副本:Kafka将每个主题分成多个分区(Partition),每个分区都是一个独立的存储单元,可以在不同的节点上进行分布,目的是为了将数据分散存储、提高并行处理能力。Kafka 副本 是 Kafka 中为了实现数据冗余和高可用性而存在的。每个 Kafka 分区都有多个副本
- 实时处理:Kafka支持实时数据处理,可以与流处理框架(如Apache Flink和Apache Spark)集成。
- 消息保留:Kafka可以根据配置的策略保留消息的时间或大小。
- 可扩展性:Kafka可以水平扩展,通过添加更多的节点来增加处理能力和存储容量,而不会中断现有的消息流。
缺点:
- 不支持死信队列,可能造成消息丢失
- 不支持定时消息
- 没有完整的顺序消息机制
集群
Kafka 集群由多个Broker组成,每个 Broker 负责处理消息的存储、分发和管理。在 Kafka 集群中,节点的工作可以分为几个主要部分:
- Broker:Kafka 的核心组成部分,负责处理消息的生产和消费请求、存储消息,以及管理消息的分区和副本。
- ZooKeeper 或 KRaft 模式:用于管理集群的元数据、选举 Controller,以及协调集群中的各种任务。早期使用 ZooKeeper 进行集群管理,而2.8.0版本中的 KRaft 模式(Kafka Raft Metadata Mode)则移除了对 ZooKeeper 的依赖。
- Controller:Kafka 集群的中央管理者,负责管理分区的副本状态、分配 Leader 副本、处理节点的上下线等。
Broker管理:
- Broker 的启动与注册
- 每个 Kafka Broker 节点启动后,会向 ZooKeeper(或 KRaft 元数据管理器)注册自己。ZooKeeper 保存所有 Broker 的信息,包括其 ID、地址和端口等。
- 在 KRaft 模式下,Broker 节点直接与 KRaft 元数据日志通信,不再通过 ZooKeeper。
- Broker 上下线的处理
- 当一个 Broker 节点启动或加入集群时,它会通知 Controller 节点,Controller 会将该 Broker 添加到元数据中,并为它分配分区和副本。
- 如果某个 Broker 节点失效(网络故障、机器宕机等),Controller 会检测到该 Broker 的下线,并将该 Broker 上存储的分区副本重新分配到其他正常运行的 Broker 上。这个过程称为 副本重分配。
Kafka 的 server.properties 示例(KRaft 模式):
# 表示该节点既是 Broker,也是 Controller。如果只想让某个节点充当 Controller,可以配置为 controller。
process.roles=broker,controller
node.id=1
# 指定元数据管理的控制器节点及其端口。这些节点组成 Kafka 元数据的管理层,负责元数据的存储和同步。1@kafka1:9093 表示第 1 个控制器节点运行在 kafka1 主机的 9093 端口。
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 定义 Kafka 节点的监听端口,包括用于 Broker 的 PLAINTEXT 协议和用于 Controller 的 CONTROLLER 协议。
listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
# 指定 Kafka 对外暴露的 IP 或域名,供客户端连接使用。
advertised.listeners=PLAINTEXT://<外部访问的IP或域名>:9092
# Kafka 的元数据日志存储位置
log.dirs=/var/lib/kafka/logs
# 指定 Kafka 使用 CONTROLLER 监听器处理集群元数据。
controller.listener.names=CONTROLLER
# KRaft 模式的集群元数据目录
metadata.log.dir=/var/lib/kafka/metadata
# 每个分区的默认副本数
default.replication.factor=3
Partition
在 Kafka 中,分区(Partition)是一种逻辑上的数据划分方式,它将一个主题(Topic)的数据分成多个部分,每个分区都是一个有序、持久化的日志队列。每个分区都可以独立地存储和管理数据,Kafka 使用分区来实现高吞吐量和水平扩展。
分区的特点
- 水平扩展: 分区允许将一个主题的数据划分为多个分区,每个分区可以在不同的 broker 节点上存储。这样可以实现数据的水平扩展,提高了整个系统的吞吐量。
- 顺序性: 在一个分区内,数据的顺序是有保障的,即使在不同分区之间数据的顺序不保证,但在同一分区内的数据会按照写入的顺序存储和读取。
- 容错性: Kafka 使用分区的复制机制来实现数据的冗余备份,保证数据的可靠性。一个分区可以有多个副本(Replica),每个副本位于不同的 broker 节点上,如果某个副本不可用,可以通过其他副本来恢复数据。
- 负载均衡: 通过调整分区的数量和分布,可以实现对消息消费者的负载均衡。消费者可以同时从多个分区中读取数据,从而提高消费的并行度。
副本因子
副本因子(Replication Factor)是指 Kafka 中每个分区的副本数。副本因子是 Kafka 实现高可用性和数据冗余的关键机制之一。通过为每个分区设置多个副本,可以确保分区数据的冗余存储,即使某个 Broker发生故障,也能够从其他拥有该分区副本的 Broker上继续提供服务,保证数据的可用性。
副本因子的类型:
- Leader 副本:每个 Kafka 分区有一个 leader 副本,它负责处理该分区的所有读写操作。
- Follower 副本:其他副本被称为 follower 副本,它们从 leader 副本那里同步数据,但不参与直接的读写操作。当 leader 副本出现故障时,Kafka 会从 follower 副本中选举一个新的 leader 副本。
副本因子的设置在创建主题(Topic)时进行,可以根据需要设置不同的副本因子。一般来说,副本因子的设置涉及到可用性、性能、资源开销等方面的权衡。常见的副本因子设置有以下几种情况:
- 副本因子为1:这意味着每个分区只有Leader副本,没有冗余。适用于单点故障没有太大影响的场景,或者测试环境等不要求高可用性的情况。
- 副本因子大于1:常见的设置为2或3。其中有一个被选为 leader 副本,其余为 follower 副本。比如设置2个副本时,数据有两个拷贝,一个在Leader上,一个在Follower副本上;设置3个副本时,数据有三个拷贝,一个在Leader上,另外两个在Follower副本分区上。这种设置提供了基本的冗余和故障转移能力,能够抵御一台 Broker 节点的故障。
- 更高的副本因子:如果对可用性和数据保护要求较高,可以设置更多的副本。但副本因子越高,数据的冗余和复制开销就越大,需要更多的存储空间和网络带宽。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic mytopic
上述命令会创建一个名为 "mytopic" 的主题,设置分区数为 3,副本因子为 2,表示每个分区都有两个副本。
需要注意的是,分区数的设置一旦确定后,就不能直接修改。如果需要修改分区数,可以考虑创建一个新的主题,并将数据从旧主题迁移到新主题。
分区数的合理设置需要根据实际的业务需求和数据规模来进行调优,合适的分区数能够充分利用 Kafka 集群的资源,提高消息的并行处理能力。
消息保留策略
在kafka中消费者消费消息后,消息仍然可以在 Kafka中存在一段时间,以便满足后续需求,如重新消费或审计等。Kafka 提供了两种常见的消息保留策略:
- 时间保留策略(Time-based Retention):根据消息的时间戳进行保留。可以配置一个保留时间段,超过该时间的消息将被删除。
- 大小保留策略(Size-based Retention):根据消息的大小进行保留。可以配置一个保留消息总大小的阈值,当消息大小超过该阈值时,较早的消息将被删除。
二次消费
根据kafka的消息保留策略可以看出来,kafka支持重新消费,我们可以通过重置/修改消费者组的偏移量进行重新消费,或者创建新的消费者来重新消费。
例如,如果你想让消费者从 offset 10 开始重新消费:
kafka-consumer-groups.sh --bootstrap-server <broker_host:port> --group <consumer_group_name> --reset-offsets --to-offset 10 --topic <your_topic> --execute
消息的有序性
在kafka中,同一个分区内的消息是有序的,但是不同分区的消息之间无法保证顺序,而且消费者组通常不会配置单线程,这就导致topic中的消息不是完全有序消费的。
如果想要实现消息的有序性,可以考虑以下几点:
- 单个分区:首先,确保每个主题只有一个分区。
- 消费者线程:为了保证顺序,每个分区应该只被一个消费者线程消费。
- 消息的生产和分区键:如果消息的顺序非常重要,可以考虑在生产者端使用消息的分区键来确保相关消息被分配到同一个分区。
- 应用层面:在应用层面做额外的处理,例如在消费者端根据消息的时间戳来排序。
什么是分区键
Kafka 的分区键是一个用于指定消息被分配到哪个分区的值。分区键是由生产者在发送消息时指定的,它决定了消息被分配到哪个主题的哪个分区中。一些常见的选择分区键的策略:
- 根据消息的关键属性:根据消息的某个关键属性,例如用户 ID、产品 ID 等,来作为分区键。这样,具有相同属性的消息会被分配到同一个分区,便于保证有序性和数据的聚合。
- 轮询分区:如果希望负载均衡,可以采用轮询分区的策略,让每个生产者按照轮询顺序选择分区,确保消息均匀分布。
- 随机分区:对于一些场景,如果消息的顺序不是特别关键,可以采用随机分区的策略,将消息随机分配到分区中。
ack机制
Kafka 使用了一种 ack(Acknowledgment)机制来确保消息的可靠性传递。有以下几种设置:
- acks = 0:生产者不需要等待任何服务器的确认。这种方式存在最高的风险,因为如果发送的消息在传输过程中出现问题,生产者将无法得知。消息可能会丢失或发送失败。
- acks = 1:生产者会等待消息被主题的 leader 分区成功写入。如果 leader 成功写入消息,生产者将收到一个确认响应。但在这种模式下,仍然有可能出现副本同步失败,因此消息仍然有丢失的风险。
- acks = all:生产者会等待消息被主题的所有副本成功写入才会收到确认响应。这是最安全的模式,确保了消息的高可靠性。但这也会增加一定的延迟,因为需要等待所有副本写入成功。
简单应用
我们可以在 Spring 项目中通过 Kafka 集群 来存储日志,使用 Logback 进行日志记录时,可以配置 Logback 将日志发送到 Kafka。我们可以通过 Logback Kafka Appender(如 logstash-logback-encoder 提供的 Kafka Appender)实现这个功能。
首先,在你的 pom.xml 中添加相关依赖。我们可以使用 logstash-logback-encoder 提供的 Kafka Appender:
<dependencies>
<!-- Logback Core -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<!-- Logback Classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- Logstash Logback Encoder (includes Kafka Appender) -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.2</version>
</dependency>
</dependencies>
配置 logback-spring.xml
<configuration>
<!-- 定义 Kafka Appender -->
<appender name="KAFKA" class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
<appender-ref ref="KAFKA_APPENDER"/>
</appender>
<appender name="KAFKA_APPENDER" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!-- Kafka 集群的 brokers -->
<destination>ip1:port1,ip2:port2,ip3:port3</destination>
<!-- 发送到 Kafka 的 topic -->
<topic>log_topic</topic>
<!-- 编码日志为 JSON 格式 -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 格式化日志为 JSON -->
<includeMdcKeyName>requestId</includeMdcKeyName>
<includeCallerData>true</includeCallerData>
</encoder>
</appender>
<!-- 配置日志级别 -->
<root level="INFO">
<!-- 将日志写到 Kafka -->
<appender-ref ref="KAFKA"/>
</root>
</configuration>
通过这种方式,Spring 项目的日志可以被发送到 Kafka 集群,方便日志的集中收集和分析。