1、客户端开发
下面是构造的消息对象ProducerRecord.
public class ProducerRecord<K,V>(
private final String topic; // 主题
private final Integer partition; // 分区号
private final Headers headers; // 消息头部
private final K key; // 键
private final V value; // 值
private final Long timestamp; // 消息时间戳
// 省略其他成员方法和构造函数
)
它并不是但存意义上的消息,而是包含了多个属性,原本业务上与业务相关的消息体只是其中一个value属性。
1.1、消息的发送
消息发送主要有三种模式:发后即忘 、同步以及异步。
public Future<RecordMetadata> send (ProducerRecord<K,V) record)
public Future<RecordMetadata> send (ProducerRecord<K,V) record, Callback callback)
send方法重载了两个方法。
第一个send就是发后即忘,它只管往kafka中发送消息而不关心消息是否正确到达。这种一帮情况下其实不会出现什么问题,但是可能会造成消息丢失。这种方法效率最高,但是可靠性最差。
同步:利用返回的Future对象实现,比如producer.send(record).get()。执行send()方法之后直接链式调用了get()方法来阻塞等待kafka的响应,直到发送成功,或者发送异常。同步发送的可靠性最高,但是性能也最差,需要阻塞等待一条消息发送完才可以发送下一条。
异步:一般是使用send()方法里面的callback回掉函数,Kafka在返回相应的时调用该函数来实现异步的发送确认。
KafkaProducer是线程安全的,可以在多个线程共享单个KafkaProducer实例,也可以将KafkaProcuder进行池化来供其他线程使用。
1.2、序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
1.3、分区器
消息在通过send()发送到broker的过程中,又可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Paratitioner)的一系列作用之后才会被真正地发往broker。
拦截器一般不是必需的,而序列化器是必须的。消息经过序列化之后,就要通过确定它发送的分区。如果消息ProducerRecord的partition字段没有指定,那么就需要依赖分区器。
如果key不为null,那么默认的分区器会对key进行哈希,最终得到的哈希值来计算分区号,拥有相同key的消息会被写入同一分区。如果key为null,那么消息会以轮询的方式发往一个可用的分区。
1.4、拦截器
拦截器(Interceptor)分为生产者拦截器和消费者拦截器。
生产者拦截器可以在发送之前做一些准备工作,比如按照某个规则过滤掉不符合要求的消息,修改消息的内容等。
2、原理分析
整个生产者客户端由两个线程协调运行,主线程和Sender线程。
主线程中,KafkaProducer创建消息,然后通过拦截器、序列化器和分区器之后,将消息累加到消息积累器(RecordAccumulator,也称消息收集器)。Sende线程负责从RecordAccumulator中获取消息,并将其发送到Kafka中。
RecordAccumulator作用:缓存消息,以便sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator的大小是通过buffer.memory来设置,默认32M。
主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在RecordAccumulator中,为每一个分区都维护了一个双端队列,队列中的内容就是Producerbatch。
Producerbatch不是ProducerRecord,Producerbatch中包含一到多个ProducerRecord,Producerbatch其实是一个消息的批次。将较小的ProducerRecord拼凑成一个较大的Producerbatch,也可以减少网络请求的次数来提供整体的吞吐量(这也是Kafka吞吐量高的原因之一)。
ProducerBatch的大小和batch.size的参数有这密切的关系。当一条消息发送到RecordAccumulator中的时候,会先寻找与消息分区对应的双端队列中,再从双端队列的尾部获取一个ProducerBatch,如果没有就新建。查看这个ProducerBatch是否还可以写入这个ProducerRecord,如果可以写入就写入,如果不行就新建ProducerBatch。
在创建ProducerBatch的时候,会评估这条消息是否超过了batch.size,如果没有超过,那么就以batch.size参数的大小来创建ProducerBatch。如果超过了,就会以评估的消息大小来创建ProducerBatch。
Sender从RecordAccumulator获取消息后,在将请求发送给Kafka之前还会将请求保存到InFlightRequest中,他的主要作用是缓存了已经发送出去但还没有得到响应的请求。
3、重要参数
3.1、acks
用来指定分区中必需要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的,这个参数涉及到吞吐量和可靠性之间的权衡。(字符串类型)。
1,默认值。代表leader成功写入即可,就会收到来自服务端的成功响应。。这种方案。有可能会造成发送消息丢失。比如在同步给其他follower的过程之前,leader突然崩溃。
0,不需要等待任何服务端的响应。最大的吞吐量。
-1或者all。等待ISR中所有副本都成功写入,最强的可靠性。
****问题****:如果leader crash时,ISR为空怎么办?
kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:
true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。