Kafka灵魂30问 - 核心篇

1、 Kafka只是消息引擎系统吗?

除了作为消息引擎,Kafka 能够被用作流处理平台和分布式存储系统。

2、Kafka 未来的演进路线是怎么样的?

3、为什么 Kafka 不像 MySQL 那样允许追随者副本对外提供读服务?

1、场景不适用。读写分离适用于那种读负载很大,而写操作相对不频繁的场景,可 Kafka 不属于这样的场景。
2、同步机制。Kafka 采用 PULL 方式实现 Follower 的同步,因此,Follower 与 Leader 存在不一致性窗口。如果允许读 Follower 副本,就势必要处理消息滞后(Lagging)的问题。

4、Kafka线上集群部署方案怎么做?

5、Kafka无消息丢失配置怎么实现?

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。

  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

6、假设那些保存数据比较多的副本都挂了怎么办?

7、你曾踩过Kafka哪些关于参数配置的“坑”?(讲一下你比较熟悉和重要的Kafka参数配置)

8、有哪些好的法则来评估 Kafka 对内存的使用呢?

9、Kafka为什么要分区?

Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。

其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。

还可以通过添加新的节点机器来增加整体系统的吞吐量。

10、你是使用哪种消息分区策略?

轮询策略、随机策略、按消息键保序策略、基于地理位置的分区策略。如果追求数据的均匀分布,还是使用轮询策略比较好。

11、Kafka 是如何压缩消息的呢?何时压缩?何时解压缩?支持哪几个压缩算法?

1、何时压缩?

压缩可能发生在两个地方:生产者端和 Broker 端

  1. Producer 启动后生产的每个消息集合都是经压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。

有两种例外情况就可能让 Broker 重新压缩消息?

  1. 情况一:Broker 端指定了和 Producer 端不同的压缩算法。
  2. 情况二:Broker 端发生了消息格式转换。

何时解压缩?

Producer 发送压缩消息到 Broker 后,Broker 照单全收并原样保存起来。当 Consumer 程序请求这部分消息时,Broker 依然原样发送出去,当消息到达 Consumer 端后,由 Consumer 自行解压缩还原成之前的消息。

Consumer 怎么知道这些消息是用何种压缩算法压缩的呢?

其实答案就在消息中。Kafka 会将启用了哪种压缩算法封装进消息集合中

支持哪几个压缩算法?

在 Kafka 2.1.0 版本之前,Kafka 支持 3 种压缩算法:GZIP、Snappy 和 LZ4。从 2.1.0 开始,Kafka 正式支持 Zstandard 算法(简写为 zstd)。它是 Facebook 开源的一个压缩算法,能够提供超高的压缩比(compression ratio)。

总结:Producer 端压缩、Broker 端保持、Consumer 端解压缩

12、Broker 要对压缩消息集合执行解压缩操作,然后逐条对消息进行校验,如何规避了broker端为执行校验而做的解压缩操作? 如何去做优化?

  • 前面我们提到了Broker要对压缩消息集合执行解压缩操作,然后逐条对消息进行校验,有人提出了一个方案:把这种消息校验移到Producer端来做,Broker直接读取校验结果即可,这样就可以避免在Broker端执行解压缩操作。

  • 刚刚看到4天前京东提的那个jira已经修复了,看来规避了broker端为执行校验而做的解压缩操作,代码也merge进了2.4版本。有兴趣的同学可以看一下:https://issues.apache.org/jira/browse/KAFKA-8106

  • 不认同,因为网络传输也会造成丢失,但是我建议可以在消息里面使用一种消耗较小的签名方式sign,比如多使用位移等方式,broke端也这么操纵,如果签名不一致证明有数据丢失,同时签名的方式可以避免CPU大量消耗

13、Kafka 拦截器作用是什么?

Kafka 拦截器分为生产者拦截器和消费者拦截器。生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景

14、Java生产者和消费者是如何管理TCP连接的?何时创建 TCP 连接?何时关闭 TCP 连接?

为何采用 TCP?

在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力。

何时创建 TCP 连接?

Producer 端
1、KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
2、KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
3、如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。

Consumer 端
1、TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的

何时关闭 TCP 连接?

Producer 端
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。

  1. 调用 producer.close() 方法
  2. 在 Producer 端参数 connections.max.idle.ms 内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。默认9分钟。

Consumer 端
Consumer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭;一种是 Kafka 自动关闭。

  1. 调用 KafkaConsumer.close() 方法
  2. 在 消费者端参数 connections.max.idle.ms 内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。默认9分钟。

15、幂等生产者和事务生产者是一回事吗?

幂等性 Producer 只能保证单分区、单会话上的消息幂等性;而事务能够保证跨分区、跨会话间的幂等性。从交付语义上来看,自然是事务型 Producer 能做的更多。

幂等性

指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。

其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。

事务

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
设置事务型 Producer 的方法也很简单,满足两个要求即可:

  1. 和幂等性 Producer 一样,开启 enable.idempotence = true。
  2. 设置 Producer 端参数 transctional. id。最好为其设置一个有意义的名字。

16、消费者组到底是什么?有什么缺点?

1、Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。
2、Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:

  • 如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;
  • 如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

3、理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。

有什么缺点?

一是限制了并发度消费者个数最大只能和kafka的分数数一样;如果kafka的数据再分区中不平衡就导致了只能等一个消费者慢慢消费;多个消费者组那就要保存多个消费组的位移信息,

17、了解位移主题吗?和普通主题什么区别?储存了什么信息?什么时候创建?分区多少?副本多少?

了解位移主题吗?

将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。位移主题就是普通的 Kafka 主题。

储存了什么信息?

1、位移主题的 Key 中保存 3 部分内容:<Group ID,主题名,分区号 >
2、用于保存 Consumer Group 信息的消息。
3、用于删除 Group 过期位移甚至是删除 Group 的消息。

什么地方会用到位移主题呢?

Consumer 提交位移时会写入该主题。如 consumer.commitSync 等。当调用这些方法时,Kafka 会向位移主题写入相应的消息。

分区多少?副本多少?

如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。也可以自己设置:

  • Broker 端参数 offsets.topic.num.partitions
  • Broker 端参数offsets.topic.replication.factor

18、消费者组重平衡能避免吗?

1、第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置session.timeout.ms 和 heartbeat.interval.ms的值。

  • 设置 session.timeout.ms = 6s。
  • 设置 heartbeat.interval.ms = 2s。
  • 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

2、第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。

  • 如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

19、Rebalance 的触发条件是那几个?

20、Rebalance 发生的频率、原因,你是怎么定位问题并解决的?

21、假设 Consumer 在处理完消息和提交位移前出现故障,下次重启后依然会出现消息重复消费的情况。如何解决该问题?

22、多线程开发消费者方案的原理?

多线程 + 多KafkaConsumer实例

1、消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。


单线程 + 单KafkaConsumer实例 + 消息处理Worker线程池

2、消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。


区别

23、如何监控消费者组消费进度?

消费者 Lag 或 Consumer Lag,指消费者当前落后于生产者的程度。

比方说,Kafka 生产者向某主题成功生产了 100 万条消息,你的消费者当前消费了 80 万条消息,那么我们就说你的消费者滞后了 20 万条消息,即 Lag 等于 20 万。

我们应该怎么监控它呢?简单来说,有 3 种方法。

  1. 使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本。
  2. 使用 Kafka Java Consumer API 编程。
  3. 使用 Kafka 自带的 JMX 监控指标。

24、Kafka 的副本机制实现的原理,有什么好处?

好处

  1. 提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
  2. 提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
  3. 改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。

原理

所谓副本(Replica),本质就是一个只能追加写消息的提交日志。

如何确保副本中所有的数据都是一致的呢?

Kafka基于领导者(Leader-based)的副本机制。

所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。

25、如果允许 Follower 副本对外提供读服务,你觉得应该如何避免或缓解因 Follower 副本与 Leader 副本不同步而导致的数据不一致的情形?

  1. 消费者可以降低读取消息的频率,给予Follower副本一定同步的时间
  2. 消费者在读取消息是优先读取Follower副本中的信息,如果读取不到再转到Leader副本中进行读取。

26、Kafka Broker 请求处理流程?


当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker 端还有个 IO 线程池,负责从该队列中取出请求,执行真正的处理。如果是 PRODUCE 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。

27、消费者组重平衡全流程?重平衡过程是如何通知到其他消费者实例的?消费者组状态机?

消费者组重平衡全流程?

重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

  1. 当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
  2. 选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。

重平衡过程是如何通知到其他消费者实例的?

靠消费者端的心跳线程

当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。

消费者组状态机

Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。


28、我们是否能允许部分消费者在重平衡过程中继续消费,以提升消费者端的可用性以及吞吐量?

29、Kafka控制器的作用是什么?控制器是如何被选出来的?控制器保存了什么数据?

控制器的作用

一方面,它要为集群中的所有主题分区选举领导者副本;另一方面,它还承载着集群的全部元数据信息,并负责将这些元数据信息同步到其他 Broker 上。

控制器是如何被选出来的?

Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。

控制器是做什么的?

  1. 主题管理(创建、删除、增加分区)
  2. 分区重分配
  3. Preferred 领导者选举
  4. 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
  5. 数据服务

控制器保存了什么数据?

30、高水位是什么?有什么缺点? LEO是什么?更新机制是什么?Leader Epoch是什么?

在 Kafka 中,高水位的作用主要有 2 个。

  1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  2. 帮助 Kafka 完成副本同步。

更新机制是什么?

高水位有什么缺点?

Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。

Leader Epoch是什么?

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。

31、Kafka如何调优?从哪几个方面去进行调优?

1、操作系统调优

  1. 在挂载(Mount)文件系统时禁掉 atime 更新,避免 inode 访问时间的写入操作,减少文件系统的写操作数。
  2. 文件系统选择 ext4 或 XFS,XFS具有高性能、高伸缩性等特点,特别适用于生产服务器
  3. 将swap 空间 swappiness 设置成一个很小的值,以防止 Linux 的 OOM Killer 开启随意杀掉进程
  4. 给 Kafka 预留的页缓存越大越好,最小值至少要容纳一个日志段的大小

2、JVM 层调优

  1. 将你的 JVM 堆大小设置成 6~8GB
  2. 建议你使用 G1 收集器,主要原因是方便省事,至少比 CMS 收集器的优化难度小得多。

3、Broker 端调优

  1. 即尽力保持客户端版本和 Broker 端版本一致。版本间的不一致会丧失很多性能收益。

4、应用层调优

  1. 不要频繁地创建 Producer 和 Consumer 对象实例。构造这些对象的开销很大,尽量复用它们。
  2. 用完及时关闭。
  3. 合理利用多线程来改善性能。Kafka 的 Java Producer 是线程安全的,你可以放心地在多个线程中共享同一个实例;而 Java Consumer 虽不是线程安全的

性能指标调优

如何提高吞吐量?

如何减少延时?

32、Kafka 集群到底需要多大的存储空间?

这是一个非常经典的规划问题。Kafka 需要将消息保存在底层的磁盘上,这些消息默认会被保存一段时间然后自动被删除。虽然这段时间是可以配置的,但你应该如何结合自身业务场景和存储需求来规划 Kafka 集群的存储容量呢?

我举一个简单的例子来说明该如何思考这个问题。假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?

我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB * 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 * 3 = 2.25TB。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355