Kafka系列《三》-- 生产者Producer中的幂等性

1. 背景

对上面的铺垫文章理解后,再看本篇文章有事半功倍的效果

先来看看Kafka中的幂等性解决的是什么问题?

设想这样一个场景:producer同时往broken发送了5个批次的数据,由于网络等原因,出现了异常case:

  • broken收到某个批次的请求数据后,成功写入了日志,但是ack给producer的时候包丢了,此时producer超时重传,broken以为是新的批次,再次写入了数据,造成了数据重复写入

  • 批次1的数据传丢了,后面的批次已经成功写入了,此时批次1的数据再次重传,broken也成功写入;但是批次的接收顺序乱了

为了解决这些异常case,才引入了幂等性,确保producer的消息能够按序唯一写入。

2. max.in.flight.requests

如果Kafka没有支持幂等性呢?这些问题有办法解决吗?

答案是只能部分解决!

我们知道producer有个配置max.in.flight.requests用来设置producer最多可以发送多少个没收到响应的请求,默认值是5;

但是如果我们设置max.in.flight.requests = 1,那么相当于producer最多只能发送一个请求,模式变成了:请求->响应->请求

这种情况下也能解决乱序问题,但是还是不能有效解决重复问题

这也是producer代码里把max.in.flight.requests = 1称为guaranteeMessageOrder的原因~

它的实现原理相对简单一点,producer中会维护一个集合:

private final Set<TopicPartition> muted;

对于已经从消息缓存中遍历过批次的topic partition,会加入到上述集合中

if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

然后再次从消息缓存中遍历批次时,如果已经存在集合中的topic partition,则会直接跳过

if (muted.contains(tp))
                continue;

直到第一个批次处理完成后,才从集合中移除这个topic partition;这样就能实现在一个批次处理完成前不会处理下个批次的数据,这样也就只会有这个topic partition的一个请求了。

3. 幂等性

producer的幂等性在较新的版本中默认就是支持的,除非你自己配置关闭;并且使用幂等性不需要是调用额外的接口来完成,普通的send方法就会保证幂等性。

而它的实现主要依赖producer id和序号来实现;即每个producer都由broken分配一个唯一的id,而producer发送的每批数据都带上一个从0开始单调递增的序号;

broken默认会缓存每个producer id发送的最近的5批次数据,并且会严格按照序号顺序接收数据;再次考虑文章最开始的两个异常case:

  • producer没收到broken的ack消息,出现了重传;重传时broken检查缓存里已经有了这个序号的数据,因此直接响应一个异常DUPLICATE_SEQUENCE_NUMBER;producer收到这个异常后即可正常的完成这个批次

  • broken收到的数据序号不是严格+1的,则认为这是个乱序的数据,直接丢弃,并响应一个异常OUT_OF_ORDER_SEQUENCE_NUMBER,producer收到这个异常后,再次将这个数据加入到缓存中方便后续重试这个批次;直到这个批次被broken正常按序接收

可以看到这需要通过幂等性就能实现单个topic partition的数据按序不重复的效果!

3.1整体流程

  • 初始化producer id:通过向broken发送INIT_PRODUCER_ID请求,由broken分批一个唯一的producer id;需要注意的是INIT_PRODUCER_ID请求属于事务类的请求,因此在这个请求完成前不会开始发送数据;这个请求和之前讲到的producer请求的实现有细微差别,但是仍然是通过NIO线程完成的,因此流程也是大差不差

Sending INIT_PRODUCER_ID request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1)

  • NIO线程每次循环都会先处理事务类型的请求,包括上述的INIT_PRODUCER_ID请求,事务类型的请求完成前不会开始发送数据

  • 通过INIT_PRODUCER_ID请求,broken会返回给producer一个唯一的id,以及一个版本标识epoch,初始都是0

Received INIT_PRODUCER_ID response from node -1 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-1, correlationId=3, headerVersion=1): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=251864, producerEpoch=0)

  • INIT_PRODUCER_ID请求完成后,幂等性的准备工作就已经完成了, 可以开始发送数据了;数据加入到缓存中的流程是完全一样的,不同的是在数据发送前,会给每个批次初始化一个基础序号,每一批次数据额外维护这些信息
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
        this.producerId = producerId;
        this.producerEpoch = producerEpoch;
        this.baseSequence = baseSequence;
    }
  • 缓存中的批次初始时是都没有序号的, 当NIO线程从缓存中获取批次时,如果该批次没有序号,则会从0开始初始化批次的基础序号;需要注意的是,缓存中的数据是以topic partition维度组织的,因此序号也是每个topic partition维度的,即每个topic partition的序号是相互独立的,并且都是从0开始,也就是幂等性只能确保单个topic partition的按序唯一特性

public final ConcurrentMap<Integer /partition/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();

  • 该批次数据从缓存中取出后,则会更新下一个批次的基础序号,下一个批次的基础序号为:前一批次的基础序号+前一批次中消息的条数;每一批次中可能包括多条消息,因为分配批次大小的时候都是按照16k分配的,能够容纳多条消息的时候,多条消息会放到一个批次里一起发送;比如,前一批次基础序号为0,包括10条消息,那么下一批次的基础序号就是10,因为序号是从0开始的;

  • 然后也是通过send方法发送请求,我们知道通过send方法发送请求前会写入批次的头部信息,而写入的批次头部信息时就会包括producer id和版本epoch、以及基础序号

buffer.putLong(position + PRODUCER_ID_OFFSET, producerId);
buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch);
buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence);
  • producer正常收到响应后,更新最后被ack的序号,直接计算即可得到:基础序号+消息条数-1;更新最后被ack的偏移量offset:响应中的基础偏移量+消息条数-1;比如,当前批次的基础序号为0,消息条数为10,那么最后被ack的序号就是9;假设响应中的基础偏移量是300,那么最后被ack的偏移量就是309

  • 当一个批次数据成功完成一次发送和响应后,在producer端正常ack这个批次的最后的序号和偏移量;在broken上收到了这个批次的基础序号和消息条数,也明确了下个批次的基础序号;

  • 如果broken收到的下个批次的基础序号不对,则直接丢弃这个批次,并响应OUT_OF_ORDER_SEQUENCE_NUMBER错误;producer收到这个异常后,再将这个批次重新插入到缓存中,等待下次轮询时候重试;这里重新插入缓存的时候需要注意,正常情况下,缓存中的数据都是没有序号的,但是出现重试时候,重试的批次是已经携带了序号的,因此插入时候应该优先插入到队首,队首的批次会被优先发送;但是如果队首已经存在多个等待重试的批次时,则最新插入的这个批次需要按照序号顺序插入到队列中,确保序号更小的批次优先发送;

  • 如果broken收到了重复的基础序号,也直接丢弃这个批次,并响应DUPLICATE_SEQUENCE_NUMBER错误;producer收到这个异常后,则知道了这个批次已经被正确接收了,不需要其它处理了,正常完成这个批次即可

  • 只要producer收到的异常是可重试的,producer都会自动重试;但是对于极端情况,broken响应的异常是producer无法重试的,那么producer则会丢弃这个批次,批次序号这样就断了,那后续的消息不是都没法发送了吗?为了解决这个问题,producer会请求开始新的版本,即增加epoch;

  • NIO线程在下次循环开始的时候,就会处理这个请求,将producer的epoch增加1;这时候情况就变得复杂了,因为在producer端epoch已经为1了(初始为0),而所有topic partition已经发出去的请求的epoch都还是0;并且broken也还是不知道producer的epoch已经变化了,因为epoch增加完全是producer端的行为,没有与broken交互的;这时候的一些异常处理:

    • 对于触发epoch自增的topic partition:将这个topic partition上所有已经发出去的请求的序号都重置;因为这个topic partition出现了序号的断裂,不重置无法继续下去了;也是从0开始递增序号;同时将这些请求的epoch都改为1;这样,这些数据再重试时候就会使用新的序号和epoch,如此,才能被broken正确接收

    • 对于其它正常的topic partition:在所有已经发出去的请求完成之前不再发送新的批次;也就是等待所有epoch=0的请求都处理完成后再开始处理新的批次,这样新的批次都会使用epoch=1

  • 通过epoch机制,能够有效的让producer从某个异常批次中快速恢复,而不需要重新启动producer

总的来说,幂等性的实现简单,但是如何从异常中恢复才是代码上的难点;

上面的这些异常处理流程值得我们学习和深思~

4. 一些需要反复看才能懂的代码

由于producer的幂等性实现和事务的实现代码混在一起,理解上可能有一点难度,我们带着学习的心态看看幂等性的源码实现~

要完全理解幂等性的异常恢复流程,重点在于理解send发送数据之后的回调handleProduceResponse

在这个回调中对可重试的异常、不可重试的异常是分开处理的,我们分别看下

4.1 可重试的异常

  • 第一种就是broken响应的异常是RetriableException的子类:比如超时、网络等原因都是可重试的异常;这个比较好理解

  • 第二种是OUT_OF_ORDER_SEQUENCE_NUMBER,这个异常是不属于RetriableException的子类,因此不属于简单的重试,它只在特定场景下才重试

    • 条件1是这个批次的重试还没达到超时时间(默认120s),同时这个批次被重置过了或者这个批次不是broken期待的下一个批次;对应的就是broken提前收到了乱序的请求,这种情况必须producer需要进行重试

    • 条件2是非事务状态,即幂等性相关的OUT_OF_ORDER_SEQUENCE_NUMBER异常都必须重试

if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
          if (!hasUnresolvedSequence(batch.topicPartition) &&
                  (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
              return true;
          } else if (!isTransactional()) {
              return true;
          }
      }
  • 第三种是UNKNOWN_PRODUCER_ID,这个异常也不属于RetriableException的子类,也不属于简单的重试,这个异常通常对应的是broken端丢失了producer的状态信息,即broken收到了无法识别的producer id或者epoch;它也只在特定场景下才重试

    • 条件1对应的是broken响应丢失了offset,需要进行重试
    • 条件2对应的是批次被重置了,需要进行重试
    • 条件3对应的是批次没被重置,但是最后ack的offset比响应的offset要小,也需要重试
    • 条件4对应的是非事务状态,即幂等性相关的UNKNOWN_PRODUCER_ID异常都需要重试
if (error == Errors.UNKNOWN_PRODUCER_ID) {
            if (response.logStartOffset == -1) {
                return true;
            }

            if (batch.sequenceHasBeenReset()) {
                return true;
            } else if (lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
                return true;
            }

            if (!isTransactional()) {
                return true;
            }
        }

总的来说,幂等性条件下,除了可重试的异常之外,OUT_OF_ORDER_SEQUENCE_NUMBER异常和UNKNOWN_PRODUCER_ID异常都必须重试!

4.2 epoch增加及批次重置

除了可重试的异常外,对于一些不可重试的异常,则需要重置批次确保后续这个topic partition的数据仍然能够正常发送

当遇到不可重试的异常时,甚至可重试异常里的一些场景,都会请求增加epoch并重置批次;请求增加epoch很简单,只需要在遇到不可重试的异常时设置epochBumpRequired = true即可,同时将这个topic partition加入到集合

synchronized void requestEpochBumpForPartition(TopicPartition tp) {
        epochBumpRequired = true;
        this.partitionsToRewriteSequences.add(tp);
    }

当NIO线程下次循环开始时,就会检查epochBumpRequired标志位,然后将epoch递增,并重置这个topic partition的所有已经发送出去的批次的epoch和序号,使用新的epoch和从0开始递增的序号;这批数据再重试时候就会使用新的epoch和序号了,broken发现epoch增加后,开始重新接收这个producer id的批次

if (epochBumpRequired) {
       setProducerIdAndEpoch(new ProducerIdAndEpoch(this.producerIdAndEpoch.producerId, (short) (this.producerIdAndEpoch.epoch + 1)));
       for (TopicPartition topicPartition : this.partitionsToRewriteSequences) {
                  this.txnPartitionMap.startSequencesAtBeginning(topicPartition, this.producerIdAndEpoch);
                  this.partitionsWithUnresolvedSequences.remove(topicPartition);
        }
        this.partitionsToRewriteSequences.clear();

        epochBumpRequired = false;
            }

需要注意的是,上面的代码其实只会重置有问题的topic partition中已经发送的批次;其它的批次不会重置,因为其它批次的序号并没有中断,broken仍然可以正常接收这些批次的数据;这时候producer处理epoch=0和epoch=1批次并存的阶段,对于epoch=0的topic partition,会等待那些已经发送出去的epoch=0的请求全部完成后,在开始处理新的批次,新的批次将会使用epoch=1开始发送

if (!first.hasSequence()) {
                if (transactionManager.hasInflightBatches(tp) && !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch())) {
                    return true;
                }

4.3 批次超时

producer发送数据时默认是没有限制重试次数的,但是限制了批次的最大超时时间是120s,那超过120s还没发出去的批次怎么处理的呢?

首先对于超过了120s并且重试过了的批次会加入到集合,标记这个topic partition出现了批次超时

for (ProducerBatch expiredBatch : expiredBatches) {
            if (transactionManager != null && expiredBatch.inRetry()) {
                int nextSequence = batch.lastSequence() + 1;
                 partitionsWithUnresolvedSequences.compute(expiredBatch.topicPartition, (k, v) -> v == null ? nextSequence : Math.max(v, nextSequence));
            }
        }

对于集合中的topic partition,则不会在发送新的批次了,等待所有已经发送出去的批次处理完毕

if (!first.hasSequence()) {
                
                if (transactionManager.hasUnresolvedSequence(first.topicPartition))
                    return true;
            }

等待所有已经发送出去的批次数据完毕后,则会增加epoch,触发epoch增加后的处理流程就和上面一样了

synchronized void maybeResolveSequences() {
        for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.keySet().iterator(); iter.hasNext(); ) {
            TopicPartition topicPartition = iter.next();
            if (!hasInflightBatches(topicPartition)) {
                // The partition has been fully drained. At this point, the last ack'd sequence should be one less than
                // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
                // reset the sequence number if necessary.
                if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
                    // This would happen when a batch was expired, but subsequent batches succeeded.
                    iter.remove();
                } else {
                    else {
                        requestEpochBumpForPartition(topicPartition);
                    }

                    iter.remove();
                }
            }
        }

这里会有两种情形:

  • 其一是,这个超时批次之前的都正常按序接收到了,这种情况下只需要丢弃这个超时的批次即可;而不需要增加epoch了

  • 其二是,这个超时批次之前仍有序号没收到,也就是出现了序号中断,这时候只能增加epoch重置序号了

接着阅读:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容