一、前言
生产者客户端(Producer API)是负责发送消息流到Kafka集群不同Topics的应用程序。
Kafka0.9版本之前,使用的是Scala语言编写的客户端,从Kafka 0.9x版本开始,官方推出了Java版客户端。(虽然Kafka是用Java/Scala语言编写的,但也有C/C++、Python、Go等其他语言客户端,这些语言客户端并非由Kafka社区维护),下面是对Kafka Java版生产者客户端的相关介绍。
二、整体结构及流程
- 上图是生产者客户端的整体结构图,主要包括客户端核心类KafkaProducer、消息累加器类RecordAccumulator、处理发送请求器类Sender以及网络I/O类Selector。
- 客户端核心类KafkaProducer是一个线程安全的类,构建KafkaProducer同时会创建消息累加器RecordAccumulator、创建并启动请求发送Sender线程。
- KafkaProducer也是发送信息的入口,消息首先经过KafkaProducer,然后通过拦截器、序列化器、分区器、消息大小验证、事务处理等流程后追加到消息累加器RecordAccumulator中。
- 消息累加器RecordAccumulator内部为每一个主题分区维护着一个双端队列Deque,每个Deque存放的是ProducerBatch类型数据,ProducerBatch是由多个较小ProducerRecord拼凑而成,这样可以减少网络请求次数,提高传输速率。消息累加器存储批量消息数据类型如下:
# 批量消息数据存储容器,其中TopicPartition表示主题分区对象
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
- RecordAccumulator 主要作用是缓存消息、组装为消息批量对象,并压缩消息,然后提供给Sender线程批量发送,进而减少网络传输资源,提升系统吞吐量。RecordAccumulator的缓存大小可以通过参数进行配置,默认值为32M。压缩类型包括GZIP、SNAPPY、LZ4、ZSTD这4种。
- Sender接收到消息之后,会将数据转换为<NodeId, List<ProducerBatch>>的形式。其中NodeId代表broker节点Id,List<ProducerBatch>代表要发往这个节点的消息数据集合,但不是最终的请求对象。Sender还会进一步封装为ClientRequest,一个ClientRequest发往一个Node节点,并且发送到集群之前,还会添加到InFlightRequests,最后通过Selector进行网络I/O层发送。
- Selector是一个非阻塞支持多连接的网络I/O类。包括NetworkSend和NetworkReceive两部分,分别负责向Kafka集群发送网络请求以及接收Kafka集群的响应。
三、重要配置参数
bootstrap.servers
- 描述:用于建立初始连接到Kafka集群(主机/端口对)的配置列表。这个配置不需要包含整个集群的服务器。不论这个参数配置了哪些服务器来初始化连接,客户端都是会均衡地与集群中的所有服务器建立连接。如果集群变化,元数据会动态更新。为了避免单节点风险,最好配置多台主机。
- 类型:list。
- 默认值:无。
max.request.size
- 描述:请求消息最大值(单位byte),序列化的消息字节数大于这个配置参数时,会抛出RecordTooLargeException。
- 类型:int。
- 默认值:1048576。
buffer.memory
- 描述:Producer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快,Producer 就会阻塞,如果阻塞的时间超过 max.block.ms 配置的时长,则会抛出一个异常。这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。 序列化的消息字节数大于这个配置参数时,会抛出RecordTooLargeException
- 类型:long。
- 默认值:33554432。
acks
- 描述:此配置是 Producer 在确认一个请求发送完成之前需要收到的反馈信息的数量。 这个参数是为了保证发送请求的可靠性。以下配置方式是允许的:
(1) acks=0 如果设置为0,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。
(2)acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。在这种情况下,如果 leader 节点在接收记录之后,并且在 follower 节点复制数据完成之前产生错误,则这条记录会丢失。
(3)acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。只要至少有一个同步副本存在,记录就不会丢失。这种方式是对请求传递的最有效保证。acks=-1与acks=all是等效的。 - 类型:string。
- 默认值:1。
compression.type
- 描述:Producer 生成数据时可使用的压缩类型。默认值是none(即不压缩)。可配置的压缩类型包括:none, gzip, snappy, 或者 lz4 。压缩是针对批处理的所有数据,所以批处理的效果也会影响压缩比(更多的批处理意味着更好的压缩)。
- 类型:string。
- 默认值:none。
retries
- 描述:若设置大于0的值,则客户端会将发送失败的记录重新发送,尽管这些记录有可能是暂时性的错误。请注意,这种 retry 与客户端收到错误信息之后重新发送记录并无区别。允许 retries 并且没有设置max.in.flight.requests.per.connection 为1时,记录的顺序可能会被改变。比如:当两个批次都被发送到同一个 partition ,第一个批次发生错误并发生 retries 而第二个批次已经成功,则第二个批次的记录就会先于第一个批次出现。
- 类型:int。
- 默认值:0。
batch.size
- 描述:当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。
- 类型:int。
- 默认值:16384。
client.id
- 描述:发出请求时传递给服务器的 ID 字符串。这样做的目的是为了在服务端的请求日志中能够通过逻辑应用名称来跟踪请求的来源,而不是只能通过IP和端口号跟进。
- 类型:string。
- 默认值:""。
connections.max.idle.ms
- 描述:在此配置指定的毫秒数之后,关闭空闲连接。
- 类型:long。
- 默认值:540000。
security.protocol
- 描述:与 brokers 通讯的协议。可配置的值有: PLAINTEXT, SSL, SASLPLAINTEXT, SASLSSL。
- 类型:string。
- 默认值:PLAINTEXT。
上面列出了优先级较高的一些配置,更多配置可以参考Kafka官方网站。