本文主要介绍kafka producer的可靠性,包括ack、batch、重试机制等
消息发布
KafkaProducer的send是异步发送方法,一旦消息存储到发送队列缓冲区,方法就会立即返回,实际处理消息发送的是一个后台Sender线程。send方法返回一个Future对象(包含消息的元数据信息如分区、消息存储的offset),并接收回调函数参数,当消息ack后会调用回调函数;调用Future的get方法,将阻塞直到相关联的请求完成并返回该消息的元数据或在消息发送过程中抛出异常。
如果是简单的阻塞调用,可以使用get方法:
完全非阻塞的用法是使用回调参数,在消息发送请求完成后调用该函数:
忽略返回参数,客户端感知不到消息发送状态:
如果应用和Kafka集群间的网络质量太差,那么阻塞的方式发送每条消息后需要等待较长时间才能收到应答。这对高并发、海量消息发送简直就是灾难,因为等待应答的时间远超过消息发送时间。如果一些消息不太注重可靠性,发送失败了只需要记录下日志,可以用回调函数方式。
Acker
Kafka使用push模式把消息发布到broker,消息后发布后,producer又是怎么确定消息已经成功持久化?这是通过acker机制实现的,broker反馈给客户端消息已经收到并写入到日志文件(基于性能考虑,broker并没有把数据落盘而是放到内存)。通过配置不同acks值,对应不同级别:
acks=0:消息会立即添加到socket缓冲区,producer不会等待broker的任何确认消息,就认为消息已经发送了。这种模式下数据传输效率是最高的,但不能保证broker已经收到了消息,所以数据可靠性也是最低的。
acks=1:意味着leader成功把消息写到本地日志后就反馈给producer,而不关心同步节点上该消息是否已写到本地。如果leader宕机但同步节点还没有及时拉取到该消息,则数据就丢失了。
acks=all/-1:leader会等待ISR中所有的同步节点都确认接收到了消息,才反馈给客户端。这种可靠性是最高的,保证了只要至少有一个in-sync副本还活着,消息就不会丢失,acks=all需要leader等待所有同步节点ack,这种延迟取决于最慢节点。但是这样也不能保证数据不丢失,比如当ISR中只有leader时,这样就变成了acks=1的情况。
当acks=all/-1时,min.insync.replicas这个参数指定了ISR中的最少副本数,默认值为1。如果ISR中的副本数小于min.insync.replicas时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。通过这两个参数配合使用,能保证消息发送的可靠性。
重试机制
Kafka Producer一般有两类错误,可重试错误会通过重试发送消息解决,比如连接重连可解决连接错误、partition重新选举leader可解决“NotLeaderForPartitionException”错误。Kafka Producer能配置重试次数,超过重试次数还不能解决的会抛出错误。另外一类就是不能通过重试处理的错误,比如,消息大小太大,这种情况下Kafka Producer会立即报错。
如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经提交(commit)。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以retry多次,确保消息已经正确传输到broker中
Batch
当多个消息被发送到同一分区时,producer会尝试将多个消息合并到一个批次,就是把多个消息打包在一起发送到 Broker。在一次请求中发送大批量数据,提高producer和broker性能,广泛用于大数据场景。
batch.size:配置批量发送的最大字节数,如果batch.size=0,会禁用batch。producer是在内存中积累数据,batch size越大占用内存越多,因会始终分配指定大小的缓冲区。
linger.ms:通常是在消息的到达速度比发送速度快,producer才会把多个消息打包成一个批次,然而,在某些场景下,客户端希望降低请求次数,这可以通过增加延迟发送功能来实现:producer不是立即发送消息,而是等待给定的延迟时间,以积累更多的消息批量发送,达到节省网络资源的目的。linger.ms配置项就是让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,默认配置为0
消息重复性
acks=-1的情况下,消息发送到leader后 ,当只有部分ISR副本完成了消息同步,leader此时挂掉,客户端会认为消息发送失败,就会重新发送数据(设置了retries),数据就可能会重复。比如follower1同步了leader的消息,follower2没有同步到,leader挂掉后,producer会得到异常,认为消息发送失败了,而follower1被选举为leader,producer又重新发送消息,这样消息就重复了。
本文首发于公众号:data之道