流程
流程讲解
在我们通过代码send消息之后,这条消息就会发往拦截器Interceptor
-
Interceptor会对数据做处理
- 加解密/脱敏
- 过滤不满足条件的数据(ip白名单、错误编码、脏数据或者残缺数据)
- 统计消息投递成功率或结合第三方工具计算消息在Kafka存储的时间
- 在消息的header里放一个唯一标识,方便下游做去重
针对旧版本,新版本Kafka引入了幂等性来保证Once Exactly(刚好一次)
-
对数据进行序列化
- 无论是否存在key,都必须给key和value指定序列化方式
- 可通过实现Serializer自定义序列化规则
-
对数据进行分区
分区策略很重要,好的分区策略可以解决数据倾斜的问题
可通过实现Partitioner接口来自定义分区规则,否则规则如下- 如果发送send的时候指定了分区,则使用指定分区
- 如未指定,则根据key进行hash,然后对分区数取模
- 如未指定且没key,则轮询发送给分区(低版本采用随机)
-
临时存储
RecordAccumulator采用了双端队列数据结构Deque来临时存储
目的:提高发送数据的吞吐量- 确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque
找不到对应的Deque则新建 - 从对应的Deque的尾巴中取出最后一个RecordBatch进行判断
如果该Batch加上当前消息的大小小于batch.size
,则追加进去;
否则创建新的Batch、将当前消息放进去并将Batch放到Deque队列 - 注:RecordBatch是写Kafka的最小单位
- 确定消息发送的分区后,会在RecordAccumulator寻找对应的Deque
Sender拉取数据
当满足linger.ms
和buffer.memory
任一个条件时,会进行数据的拉取排队发送
每一个Deque的数据都有一个对应的ClientRequest,负责携带RecordBatch
排队等待前一个RecordBatch的响应包装
将ClientRequest扔到KafkaChannel中,等到Selector的发送-
写Kafka
这一步骤是真正的往Kafka的Broker中写数据,回应的规则是- ack=0:发送出去就立马执行第10步,不等待响应
典型的fire and forget
, 性能最好,但也最容易丢数据 - ack=1:发送出去,等到那批数据被写到主副本上时,就成功响应
由于只是写到主副本的页缓存,因此存在丢数据的可能 - ack=-1:发送出去,直到ISR队列中包括主副本在内的
min.insync.replicas
个副本被写成功,才成功响应-
ack=-1
搭配min.insync.replicas
的结果
让kafka的副本复制策略游离在同步复制和异步复制之间
既避免了同步复制拖慢性能,又提高了异步复制的可靠性
-
- ack=0:发送出去就立马执行第10步,不等待响应
回复NetworkClient,开始下一个RecordBatch的发送
NetworkClient回复RecordAccumulator
概念
Kafka的生产者就是往Kafka写消息的程序
比如flume、spark、filebeat等,可以是一个进程也可以是一个线程
压缩
-
Kafka的压缩也是比较有意思的,特别是2.1版本引入的 ZStandard
在CPU相对空闲的情况可通过设置compression.type
来开启
使用压缩要注意以下几点:- 消息的格式需要保证一致(V0、V1和V2不要搭配使用,否则会导致Broker端多一次解压缩)
- Broker端不要设置跟
compression.type
不用的压缩类型,否则也会多一次解压缩甚至丧失零拷贝特性
好处:减小网络传输压力以及Broker存储数据的磁盘占用量
生产环境注意问题
-
kafka在运行期间可增加分区数,在增加分区数前,需注意以下几点:
- 数据乱序
由于消息一般都是进行hash然后对分区数取模,增加分区数会导致原来该放到1分区的消息被放到了2,从而无法保证数据的有序 - 数据丢失
分为两种情况讨论- 消费者指定分区消费
加入消费者A只消费1分区的数据,而分区数增加导致原本应该放到1分区的数据被放到了其他分区,从而导致消费者A无法消费到该条消息 - 消费者拉取的策略为last
当产生分区3时,如果生产者先感知到并往里边投递消息;
消费者隔一段时间后才感知到并且由于配置了last
,只能从最新的消息进行拉取,那么分区3里面就会被一部分消息不会被消费导致丢数据
- 消费者指定分区消费
- 数据乱序
-
调整消息大小
这个要重点提一下,生产环境下经常会遇到的坑
由于kafka默认消息大小message.max.bytes
还不到1M,因此会经常调整该值但是要切记!!!
调整message.max.bytes
之前,请先调整
replica.fetch.max.bytes
:确保副本之间能正常复制
fetch.message.max.bytes
:确保消息可以被消费者正常消费 生产者日志里面存在以下异常
NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决
LeaderNotAvailableException表示分区的leader副本不可用,leader副本下线而新的leader副本选举完成之前,重试之后可以重新恢复
UnknownTopicOrPartitionException
NotEnoughReplicasException
NotCoordinatorException
解释:以上异常都是可通过重试机制来解决的,因此可通过设置以下两个参数来解决
retries
:遇到以上异常不会直接抛,而是尝试重试该参数设置的次数,若都不成功再抛异常
retry.backoff.ms
:表示两次重试间隔,一般根据异常情况来调整,网络不好情况可适当延长
既然有可重试的异常,自然也有不可重试的异常。如:
RecordTooLargeException异常,表示所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常
像序列化反序列化失败、数据格式不对等异常也是不可重试的
注意
使用重试的话消息可能也会乱序,可通过设置以下参数进行避免
max.in.flight.requests.per.connection设置为1
但对性能会有一定的影响
疑问
- 新版生产者客户端(0.9以后引入的)相比旧版都有哪些优势?
- Kafka的消息为什么换了那么多种格式?(V0、V1、V2甚至V0之前还有其他的格式)
- KafkaProducer是怎么保证线程安全的?
小结
本篇博客一开始以图的形式,通过给大家描述消息写到Kafka的流程引入了生产者相关的角色和概念;
之后是简单介绍Kafka生产者的一些相关概念,最后是列出了一些生产环境需要注意的问题。
希望读者能够喜欢这种描述方式,同时相信读者也有不少的疑惑或者有觉得不对的地方,欢迎在下方进行留言讨论
写在最后
读者有没有曾经好奇过,Kafka为什么这么快 ???
在该篇内容中已埋下了一些伏笔,之后的博客中会跟大家一起进行探讨