Kafka Producer
Kafka Producer 是 kafka 提供的与 Kafka Broker 连接的客户端工具,其具备高吞吐量、高性能等特性。
目录
- Kafka Producer 核心组件
Kafka Producer 核心组件
- KafkaProducer:Kafka Producer 操作对象
- ProducerConfig:Kafka Producer 配置类
- ProducerMetadata:Kakfa Producer 客户端元信息
- ProducerInterceptors:Kafka Producer 拦截器
- Partitioner:分区选择器
- DefaultPartitioner:默认分区选择器
- RoundRobinPartitioner:轮训分区选择器
- UniformStickyPartitioner:随机分区选择器
- KeySerializer:ProducerRecord Key 序列化
- ValueSerializer:ProducerRecord Value 序列化
- RecordAccumulator:消息缓冲队列
- Metadata:Kafka Topic 元数据
- Sender:Kafka Sender,主要负责与 Kafka Broker 业务逻辑交互的组件
- KafkaClient:Kafka Producer 与 Node 通信的接口
- NetworkClient:提供异步的 Request/Response 的 IO 通信组件
- KafkaClient:Kafka Producer 与 Node 通信的接口
- Metrics:Kafka Producer Metric 指标收集组件
- ProducerBatch:带发送的的一个组合对象
- TopicPartition:Topic、Partition 信息
- MemoryReocrdBuilder:Record 内存建造对象
- Cluster:集群元数据
KafkaProducer 消息发送过程
- send(ProducerRecord<K,V> record) | send(ProducerRecord(K,V) record,Callback callback):Kafka 消息发送API
- 通过 interceptors 将 record 装饰成 interceptedRecord
- doSend(ProducerRecord(K,V) record,Callback callback)
- doSend(ProducerRecord(K,V) record,Callback callback)
- waitOnMetadata(String Topic, Integer Partition, long nowMs, long maxWaitMs) = ClusterAndWaitTime:获取 Topic 对应的 Cluster 的元信息
- 通过 metadata.fetch() 获取 Cluster 集群元信息
- 通过 int lastVersion = cluster.partitionCountForTopic(topic) 获取 Topic 对应的 Partition 个数,如果已有缓存则直接返回 ClusterAndWaitTime;
- 否则,将 Topic 添加到 metadata 中,metadata.requestUpdateForTopic(topic) 更新需要 version 的版本号,然后 Sender.waitup() 唤醒 Sender 线程进行获取元信息
- 然后通过 metadata.awaitUpdate(int Version,long remainingWaitMs),等待拉取元信息(其实现通过 Synchronized 的 wait(long waitMs) 方法),通过乐观锁比较 updateVersion 和 lastVersion,如果 updateVersion 大于 laestVersion 则说明更新元信息成功,否则更新元信息失败;
- 然后再通过 Cluster cluster = metadata.fectch() 获取集群元信息,然后通过 cluster.partitionCountForTopic(topic) 获取 Topic 对应的 Partition 的个数
- PS:waitOnMetadata() 是当前线程自旋获取 Topic 的元信息,除非执行时间大于 maxWaitMs 时间后抛出 TimeoutException,否则会一直轮训获取。
- PS:ClusterAndWaitTime 包括了 Cluster 的 Partition 和 ISR 等;
- 通过 byte[] serializedKey,serializedValue = Serializer 序列化;Kafka 默认是 ByteArraySerializer。
- partition(ProducerRecord record,byte[] serializedKey,byte[] serializedValue,Cluster cluster) -> int partiton:根据 ProducerRecord,serializedKey,serializedValue,Cluster 选择 Topic 对应的 Partition-Id
- 可根据ProducerConfig中指定的 Partitionor 策略或者在 KafkaProducer.send(ProducerRecord) 中指定PartitionId
- 根据 MagicVersion、CompressionType、SerializedKey,SerializedValue、Headers 计算整个请求字节大小(SerializedSize)
- 根据 {buffer.memory} 校验 SerializedSize 是否大于配置,如果大于则抛出异常
- 将 Callback、Interceptors、TopicPartition 组合成 InterceptorCallback
- 通过 RecordAccumulator.append(TopicPartition,TimeStamp、SerializedKey,SerializedValue,Headers,InterceptorCallback,...) -> RecordAppendResult:方法追加入缓冲区,
- 返回 RecordAppendResult.future() -> Future<RecordMetadata>
Metadata 获取 Topic 元信息
Metadata 是 Kafka 元信息类,在 KafkaProducer 中,ProducerMetadata 是 Metadata 的实现。
KafkaProducer 在构造函数初始化时,new ProducerMetadata 赋予给 Metadata,然后通过 Metadata.bootstrap(List<InetSocketAddress> address) (PS:Metadata 会构建一个属性为 Empty 的 MetadataCache)。
KafkaProducer 通过 metadata.fetch() 获取 Cluster 集群信息。实际上是通过 MetadataCache 获取 Cluster 集群信息缓存。
KafkaProducer 将需要获取 Topic 的元信息添加到 metadata 集合,然后通过 cluster.partitionCountForTopic(Topic) 获取 Topic 对应的 Partition 分区个数。如果在 ProducerRecord 有指定 PartitionId,则判断 PartitionId 是否小于 Partition 分区个数并立刻返回 ClusterAndWaitTime,否则抛出异常。
通过 do...while 轮训获取 Topic 的元信息。
- 先将 Topic 添加到 Metadata,然后通过 metadata.requestUpdateForTopic() 返回当前拉取元信息的 LastVersion 版本号。
- 通过 sender.wakeup() 唤醒 Sender 线程,并通过 metadata.awaitUpdate(LastVersion, remainingWaitMs) 配合乐光锁拉取 Topic 的元信息
- 底层通过 Synchronized 的 wait(remainingWaitMs) 释放锁并等待 Sender 唤醒
返回封装最新的 Cluster 的 ClusterAndWaitTime
RecordAccumulator
类介绍
/**
* This class acts as a queue that accumulates records into {@link MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/
- RecordAccumulator 提供类似队列的数据结构,来存储 Accumualtor Record 数据
- Accumualtor Record 的 ProducerBatch 是基于 MemoryRecords 存储在堆外内存的(NIO)
- Accumualtor Record 是用于存储待 Sender 发送给 Broker 的缓冲队列
- Accumualtor 通过累计方式来使用有限的内存存储,如果累计内存大于阈值则会阻塞(block)调用者,除非在配置中禁止此阻塞
RequestBatch
一个 RequestBatch 对应一块内存空间,batch.size 默认的大小为16kb,如果 ProducerRecord Size 大于 batch.size,则使用 ProducerRecord Size 的大小分配一块内存空间。如果小于 batch.size,则基于 batch.size 分配内存空间
ByteBuffer 组成
Kafka 消息实际上是二进制协议,严格遵循二进制的规范写入 buffer 里
offset | size | crc | magic | attibutes | timestamp | key size | key | value size | value