kafka消息幂等性

生产端幂等性

  • 什么是幂等性,为什么要实现幂等性?
    分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack前,有可能出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。注,在未达到最大重试次数前,会自动重试(非应用程序代码写的重试)

  • 消息重试对顺序消息的影响
    我们知道正常情况下,单个分区内的消息是按发送时间有序的,但发生消息重试时候(例如前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序),kafka如果保证分区内顺序有序?


    图片.png

,这个找到最旧的非常关键:【Sender 线程发送时,在遍历 queue 中的 batch 时,会检查这个 batch 是否是重试的 batch,如果是的话,只有这个 batch 是最旧的那个需要重试的 batch,才允许发送,否则本次发送跳过这个 Topic-Partition 数据的发送等待下次发】。


幂等性的开启使用

需要在生产端配置参数enable.idempotence = true,当幂等性开启的时候acks即为all。如果显性的将acks设置为0或-1,那么将会报错 Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence 。

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); 
props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, "3"); 
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world")).get(); kafkaProducer.close();

幂等性实现原理

Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其消息序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:
a. 如果消息序号比Broker维护的序号大于1以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
b. 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
这种设计解决的问题:
1: Broker 保存消息后,发送 ACK 前宕机,Producer 认为消息未发送成功并重试,造成数据重复
2: 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序


如前面所述,幂等性要解决的问题是,在设置了kafka的at least once 时,由于触发重试机制导致的重复问题简单来说 at least once + 幂等 = exactly once
kafka producer实现幂等性 的两个重要机制
1:producerID,用来表示每个producer Client
2:sequence number client发送的每条消息都会带有seqnum 。server端根据这个num来判断数据是否重复

  • producerID 在哪儿产生?
    在server端产生,Client 通过向 Server 发送一个 InitProducerIdRequest 请求获取 PID(幂等性时,是选择一台连接数最少的 Broker 发送这个请求)。
    【凡是开启幂等性都是需要生成PID,只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成】。

  • sequence number:
    Kafka发送消息都是以batch的格式发送,batch包含了多条消息。所以Producer发送消息batch的时候,只会设置该batch的第一个消息的序列号,后面消息的序列号可以根据第一个消息的序列号计算出来

每一个topic-partition 都有一套自己的sequence number
producer初始化的时候回分配一个producerID。对于一个给定的pid他的sequence number会从0开始自增。每一个topic-partition都有自己的一套sequence number,client发送的每条消息都有seq num Server就是根据这个seqnum判断是否重复, 是一个从0开始单调递增的值。
但是这里的PID是全局唯一的,如果client挂掉重启优惠重写分配一个PID,这也是幂等性无法做到跨会话的原因。


下面代码摘要自网络,producerId的创建过程:


图片.png

调用了 TransactionCoordinator (Broker 在启动 server 服务时都会初始化这个实例)的 handleInitProducerId() 方法做了相应的处理,其实现如下(这里只关注幂等性的处理):


图片.png

幂等性时发送流程

正常流程:



异常流程:


图片.png

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

幂等性处理细节

producer端处理细节:

  1. 应用通过 KafkaProducer 的 send() 方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch(消息集,批量发送) 还是没有 PID 和 sequence number 信息的

  2. Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 的 判断当前的 PID 是否需要重置,重置的原因是因为:如果有 topic-partition 的 batch 重试多次失败最后因为超时而被移除,这时 sequence number 将无法做到连续,因为 sequence number 有部分已经分配出去,这时系统依赖自身的机制无法继续进行下去(因为幂等性是要保证不丢不重的),相当于程序遇到了一个 fatal 异常,PID 会进行重置,TransactionManager 相关的缓存信息被清空(Producer 不会重启),只是保存状态信息的 TransactionManager 做了 clear+new 操作,遇到这个问题时是无法保证 exactly once 的(有数据已经发送失败了,并且超过了重试次数);

  3. Sender 线程通过 maybeWaitForProducerId() 方法判断是否需要申请 PID,如果需要的话,这里会阻塞直 到获取到相应的 PID 信息;

  4. Sender 线程通过 sendProducerData() 方法发送数据,整体流程与之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 drain() 方法中,在加了幂等性之后, drain() 方法多了如下几步判断:

  • 常规的判断:判断这个 topic-partition 是否可以继续发送(如果出现前面2中的情况是不允许发送的)、判断 PID 是否有效、如果这个 batch 是重试的 batch,那么需要判断这个 batch 之前是否还有 batch 没有发送完成,如果有,这里会先跳过这个 Topic-Partition 的发送,直到前面的 batch 发送完成。
  • 如果这个 ProducerBatch 还没有这个相应的 PID 和 sequence number 信息,会在这里进行相应的设置;
  1. 最后 Sender 线程再调用 sendProduceRequests() 方法发送 ProduceRequest 请求,后面的就跟之前正常。

幂等性时 Server 端如何处理 ProduceRequest 请求

幂等性服务端增加的校验,有了 PID 信息,并且不是重复 batch 时,在更新 producer 信息时,会做以下校验:

server 端会缓存 PID 对应这个 Topic-Partition 的最近5个 batch 信息

  1. 检查该 PID 是否已经缓存中存在(主要是在 ProducerStateManager 对象中检查);
  2. 如果不存在,那么判断 sequence number 是否 从0 开始,是的话,在缓存中记录 PID 的 meta(PID,epoch, sequence number),并执行写入操作。
    否则返回 UnknownProducerIdException(PID 在 server 端已经过期或者这个 PID 写的数据都已经过期了,但是 Client 还在接着上次的 sequence number 发送数据);
  3. 如果该 PID 存在,先检查 PID epoch 与 server 端记录的是否相同;
  4. 如果不同并且 sequence number 不从 0 开始,那么返回 OutOfOrderSequenceException 异常;
  5. 如果不同并且 sequence number 从 0 开始,那么正常写入;
  6. 如果相同,那么根据缓存中记录的最近一次 sequence number(currentLastSeq)检查是否为连续(会区分为 0、Int.MaxValue 等情况),不连续的情况下返回 OutOfOrderSequenceException 异常。

考两个问题:

  1. Producer 在设置幂等性时,为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5,如果设置大于 5(不考虑 Producer 端参数校验的报错),会带来什么后果?

答案: Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据.
先来分析一下,在什么情况下 Producer 会出现乱序的问题?没有幂等性时,乱序的问题是在重试时出现的,举个例子:client 依然发送了 6 个请求 1、2、3、4、5、6(它们分别对应了一个 batch),这 6 个请求只有 2-6 成功 ack 了,1 失败了,这时候需要重试,重试时就会把 batch 1 的数据添加到待发送的数据列队中),那么下次再发送时,batch 1 的数据将会被发送,这时候数据就已经出现了乱序,因为 batch 1 的数据已经晚于了 batch 2-6。

  1. Producer 在设置幂等性时,如果我们设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 大于 1,那么是否可以保证有序,如果可以,是怎么做到的?

当 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 1 时,是可以解决这个为题,因为同时只允许一个请求正在发送,只有当前的请求发送完成(成功 ack 后),才能继续下一条请求的发送,类似单线程处理这种模式,每次请求发送时都会等待上次的完成,效率非常差,但是可以解决乱序的问题(当然这里有序只是针对单 client 情况,多 client 并发写是无法做到的)


Kafka 2.0.0的优化:

简单来说,其实现机制概括为:
1:Server 端验证 batch 的 sequence number 值,不连续时,直接返回异常;
2:Client 端请求重试时,batch 在 reenqueue 时会根据 sequence number 值放到合适的位置(有序保证之一);
3:Sender 线程发送时,在遍历 queue 中的 batch 时,会检查这个 batch 是否是重试的 batch,如果是的话,只有这个 batch 是最旧的那个需要重试的 batch,才允许发送,否则本次发送跳过这个 Topic-Partition 数据的发送等待下次发送。

参考:
https://blog.csdn.net/qq_37923600/article/details/88583170

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容