1. 背景
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
对上面的铺垫文章理解后,再看本篇文章有事半功倍的效果
先来看看Kafka中的幂等性解决的是什么问题?
设想这样一个场景:producer同时往broken发送了5个批次的数据,由于网络等原因,出现了异常case:
broken收到某个批次的请求数据后,成功写入了日志,但是ack给producer的时候包丢了,此时producer超时重传,broken以为是新的批次,再次写入了数据,造成了数据重复写入
批次1的数据传丢了,后面的批次已经成功写入了,此时批次1的数据再次重传,broken也成功写入;但是批次的接收顺序乱了
为了解决这些异常case,才引入了幂等性,确保producer的消息能够按序唯一写入。
2. max.in.flight.requests
如果Kafka没有支持幂等性呢?这些问题有办法解决吗?
答案是只能部分解决!
我们知道producer有个配置max.in.flight.requests
用来设置producer最多可以发送多少个没收到响应的请求,默认值是5;
但是如果我们设置max.in.flight.requests = 1
,那么相当于producer最多只能发送一个请求,模式变成了:请求->响应->请求
这种情况下也能解决乱序问题,但是还是不能有效解决重复问题
这也是producer代码里把max.in.flight.requests = 1
称为guaranteeMessageOrder
的原因~
它的实现原理相对简单一点,producer中会维护一个集合:
private final Set<TopicPartition> muted;
对于已经从消息缓存中遍历过批次的topic partition,会加入到上述集合中
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
然后再次从消息缓存中遍历批次时,如果已经存在集合中的topic partition,则会直接跳过
if (muted.contains(tp))
continue;
直到第一个批次处理完成后,才从集合中移除这个topic partition;这样就能实现在一个批次处理完成前不会处理下个批次的数据,这样也就只会有这个topic partition的一个请求了。
3. 幂等性
producer的幂等性在较新的版本中默认就是支持的,除非你自己配置关闭;并且使用幂等性不需要是调用额外的接口来完成,普通的send方法就会保证幂等性。
而它的实现主要依赖producer id和序号来实现;即每个producer都由broken分配一个唯一的id,而producer发送的每批数据都带上一个从0开始单调递增的序号;
broken默认会缓存每个producer id发送的最近的5批次数据,并且会严格按照序号顺序接收数据;再次考虑文章最开始的两个异常case:
producer没收到broken的ack消息,出现了重传;重传时broken检查缓存里已经有了这个序号的数据,因此直接响应一个异常
DUPLICATE_SEQUENCE_NUMBER
;producer收到这个异常后即可正常的完成这个批次broken收到的数据序号不是严格+1的,则认为这是个乱序的数据,直接丢弃,并响应一个异常
OUT_OF_ORDER_SEQUENCE_NUMBER
,producer收到这个异常后,再次将这个数据加入到缓存中方便后续重试这个批次;直到这个批次被broken正常按序接收
可以看到这需要通过幂等性就能实现单个topic partition的数据按序不重复的效果!
3.1整体流程
- 初始化producer id:通过向broken发送
INIT_PRODUCER_ID
请求,由broken分批一个唯一的producer id;需要注意的是INIT_PRODUCER_ID
请求属于事务类的请求,因此在这个请求完成前不会开始发送数据;这个请求和之前讲到的producer请求的实现有细微差别,但是仍然是通过NIO线程完成的,因此流程也是大差不差
Sending INIT_PRODUCER_ID request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1)
NIO线程每次循环都会先处理事务类型的请求,包括上述的
INIT_PRODUCER_ID
请求,事务类型的请求完成前不会开始发送数据通过
INIT_PRODUCER_ID
请求,broken会返回给producer一个唯一的id,以及一个版本标识epoch,初始都是0
Received INIT_PRODUCER_ID response from node -1 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-1, correlationId=3, headerVersion=1): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=251864, producerEpoch=0)
-
INIT_PRODUCER_ID
请求完成后,幂等性的准备工作就已经完成了, 可以开始发送数据了;数据加入到缓存中的流程是完全一样的,不同的是在数据发送前,会给每个批次初始化一个基础序号,每一批次数据额外维护这些信息
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
}
- 缓存中的批次初始时是都没有序号的, 当NIO线程从缓存中获取批次时,如果该批次没有序号,则会从0开始初始化批次的基础序号;需要注意的是,缓存中的数据是以topic partition维度组织的,因此序号也是每个topic partition维度的,
即每个topic partition的序号是相互独立的,并且都是从0开始,也就是幂等性只能确保单个topic partition的按序唯一特性
public final ConcurrentMap<Integer /partition/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
该批次数据从缓存中取出后,则会更新下一个批次的基础序号,下一个批次的基础序号为:
前一批次的基础序号+前一批次中消息的条数
;每一批次中可能包括多条消息,因为分配批次大小的时候都是按照16k分配的,能够容纳多条消息的时候,多条消息会放到一个批次里一起发送;比如,前一批次基础序号为0,包括10条消息,那么下一批次的基础序号就是10,因为序号是从0开始的;然后也是通过send方法发送请求,我们知道通过send方法发送请求前会写入批次的头部信息,而写入的批次头部信息时就会包括producer id和版本epoch、以及基础序号
buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
producer正常收到响应后,更新最后被ack的序号,直接计算即可得到:基础序号+消息条数-1;更新最后被ack的偏移量offset:响应中的基础偏移量+消息条数-1;比如,当前批次的基础序号为0,消息条数为10,那么最后被ack的序号就是9;假设响应中的基础偏移量是300,那么最后被ack的偏移量就是309
当一个批次数据成功完成一次发送和响应后,在producer端正常ack这个批次的最后的序号和偏移量;在broken上收到了这个批次的基础序号和消息条数,也明确了下个批次的基础序号;
如果broken收到的下个批次的基础序号不对,则直接丢弃这个批次,并响应
OUT_OF_ORDER_SEQUENCE_NUMBER
错误;producer收到这个异常后,再将这个批次重新插入到缓存中,等待下次轮询时候重试;这里重新插入缓存的时候需要注意,正常情况下,缓存中的数据都是没有序号的,但是出现重试时候,重试的批次是已经携带了序号的,因此插入时候应该优先插入到队首,队首的批次会被优先发送;
但是如果队首已经存在多个等待重试的批次时,则最新插入的这个批次需要按照序号顺序插入到队列中,确保序号更小的批次优先发送;
如果broken收到了重复的基础序号,也直接丢弃这个批次,并响应
DUPLICATE_SEQUENCE_NUMBER
错误;producer收到这个异常后,则知道了这个批次已经被正确接收了,不需要其它处理了,正常完成这个批次即可只要producer收到的异常是可重试的,producer都会自动重试;但是对于极端情况,broken响应的异常是producer无法重试的,那么producer则会丢弃这个批次,批次序号这样就断了,那后续的消息不是都没法发送了吗?为了解决这个问题,producer会请求开始新的版本,即增加epoch;
-
NIO线程在下次循环开始的时候,就会处理这个请求,将producer的epoch增加1;这时候情况就变得复杂了,因为在producer端epoch已经为1了(初始为0),而所有topic partition已经发出去的请求的epoch都还是0;并且broken也还是不知道producer的epoch已经变化了,因为epoch增加完全是producer端的行为,没有与broken交互的;这时候的一些异常处理:
对于触发epoch自增的topic partition:将这个topic partition上所有已经发出去的请求的序号都重置;因为这个topic partition出现了序号的断裂,不重置无法继续下去了;也是从0开始递增序号;同时将这些请求的epoch都改为1;这样,这些数据再重试时候就会使用新的序号和epoch,如此,才能被broken正确接收
对于其它正常的topic partition:在所有已经发出去的请求完成之前不再发送新的批次;也就是等待所有epoch=0的请求都处理完成后再开始处理新的批次,这样新的批次都会使用epoch=1
通过epoch机制,能够有效的让producer从某个异常批次中快速恢复,而不需要重新启动producer
总的来说,幂等性的实现简单,但是如何从异常中恢复才是代码上的难点;
上面的这些异常处理流程值得我们学习和深思~
4. 一些需要反复看才能懂的代码
由于producer的幂等性实现和事务的实现代码混在一起,理解上可能有一点难度,我们带着学习的心态看看幂等性的源码实现~
要完全理解幂等性的异常恢复流程,重点在于理解send发送数据之后的回调handleProduceResponse
在这个回调中对可重试的异常、不可重试的异常是分开处理的,我们分别看下
4.1 可重试的异常
第一种就是broken响应的异常是
RetriableException
的子类:比如超时、网络等原因都是可重试的异常;这个比较好理解-
第二种是
OUT_OF_ORDER_SEQUENCE_NUMBER
,这个异常是不属于RetriableException
的子类,因此不属于简单的重试,它只在特定场景下才重试条件1是这个批次的重试还没达到超时时间(默认120s),同时这个批次被重置过了或者这个批次不是broken期待的下一个批次;对应的就是broken提前收到了乱序的请求,这种情况必须producer需要进行重试
条件2是非事务状态,即幂等性相关的
OUT_OF_ORDER_SEQUENCE_NUMBER
异常都必须重试
if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
if (!hasUnresolvedSequence(batch.topicPartition) &&
(batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
return true;
} else if (!isTransactional()) {
return true;
}
}
-
第三种是
UNKNOWN_PRODUCER_ID
,这个异常也不属于RetriableException
的子类,也不属于简单的重试,这个异常通常对应的是broken端丢失了producer的状态信息,即broken收到了无法识别的producer id或者epoch;它也只在特定场景下才重试- 条件1对应的是broken响应丢失了offset,需要进行重试
- 条件2对应的是批次被重置了,需要进行重试
- 条件3对应的是批次没被重置,但是最后ack的offset比响应的offset要小,也需要重试
- 条件4对应的是非事务状态,即幂等性相关的
UNKNOWN_PRODUCER_ID
异常都需要重试
if (error == Errors.UNKNOWN_PRODUCER_ID) {
if (response.logStartOffset == -1) {
return true;
}
if (batch.sequenceHasBeenReset()) {
return true;
} else if (lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
return true;
}
if (!isTransactional()) {
return true;
}
}
总的来说,幂等性条件下,除了可重试的异常之外,OUT_OF_ORDER_SEQUENCE_NUMBER
异常和UNKNOWN_PRODUCER_ID
异常都必须重试!
4.2 epoch增加及批次重置
除了可重试的异常外,对于一些不可重试的异常,则需要重置批次确保后续这个topic partition的数据仍然能够正常发送
当遇到不可重试的异常时,甚至可重试异常里的一些场景,都会请求增加epoch并重置批次;请求增加epoch很简单,只需要在遇到不可重试的异常时设置epochBumpRequired = true
即可,同时将这个topic partition加入到集合
synchronized void requestEpochBumpForPartition(TopicPartition tp) {
epochBumpRequired = true;
this.partitionsToRewriteSequences.add(tp);
}
当NIO线程下次循环开始时,就会检查epochBumpRequired
标志位,然后将epoch递增,并重置这个topic partition的所有已经发送出去的批次的epoch和序号,使用新的epoch和从0开始递增的序号;这批数据再重试时候就会使用新的epoch和序号了,broken发现epoch增加后,开始重新接收这个producer id的批次
if (epochBumpRequired) {
setProducerIdAndEpoch(new ProducerIdAndEpoch(this.producerIdAndEpoch.producerId, (short) (this.producerIdAndEpoch.epoch + 1)));
for (TopicPartition topicPartition : this.partitionsToRewriteSequences) {
this.txnPartitionMap.startSequencesAtBeginning(topicPartition, this.producerIdAndEpoch);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}
this.partitionsToRewriteSequences.clear();
epochBumpRequired = false;
}
需要注意的是,上面的代码其实只会重置有问题的topic partition中已经发送的批次;其它的批次不会重置,因为其它批次的序号并没有中断,broken仍然可以正常接收这些批次的数据;这时候producer处理epoch=0和epoch=1批次并存的阶段,对于epoch=0的topic partition,会等待那些已经发送出去的epoch=0的请求全部完成后,在开始处理新的批次,新的批次将会使用epoch=1开始发送
if (!first.hasSequence()) {
if (transactionManager.hasInflightBatches(tp) && !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch())) {
return true;
}
4.3 批次超时
producer发送数据时默认是没有限制重试次数的,但是限制了批次的最大超时时间是120s,那超过120s还没发出去的批次怎么处理的呢?
首先对于超过了120s并且重试过了的批次会加入到集合,标记这个topic partition出现了批次超时
for (ProducerBatch expiredBatch : expiredBatches) {
if (transactionManager != null && expiredBatch.inRetry()) {
int nextSequence = batch.lastSequence() + 1;
partitionsWithUnresolvedSequences.compute(expiredBatch.topicPartition, (k, v) -> v == null ? nextSequence : Math.max(v, nextSequence));
}
}
对于集合中的topic partition,则不会在发送新的批次了,等待所有已经发送出去的批次处理完毕
if (!first.hasSequence()) {
if (transactionManager.hasUnresolvedSequence(first.topicPartition))
return true;
}
等待所有已经发送出去的批次数据完毕后,则会增加epoch,触发epoch增加后的处理流程就和上面一样了
synchronized void maybeResolveSequences() {
for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.keySet().iterator(); iter.hasNext(); ) {
TopicPartition topicPartition = iter.next();
if (!hasInflightBatches(topicPartition)) {
// The partition has been fully drained. At this point, the last ack'd sequence should be one less than
// next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
// reset the sequence number if necessary.
if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
// This would happen when a batch was expired, but subsequent batches succeeded.
iter.remove();
} else {
else {
requestEpochBumpForPartition(topicPartition);
}
iter.remove();
}
}
}
这里会有两种情形:
其一是,这个超时批次之前的都正常按序接收到了,这种情况下只需要丢弃这个超时的批次即可;而不需要增加epoch了
其二是,这个超时批次之前仍有序号没收到,也就是出现了序号中断,这时候只能增加epoch重置序号了
接着阅读: