Kafka幂等性实现简析

前言

在之前的《两阶段提交(2PC)与其在Flink exactly once中的应用》这篇文章中,笔者特别提到了Kafka从0.11版本开始支持幂等性与事务性,使得端到端exactly once语义成为可能。幂等性与事务性都是Kafka发展过程中非常重要的feature,本文将幂等性拿出来聊一聊。

幂等性

在正常情况下,Producer向Broker投递消息,Broker将消息追加写到对应的流(即某一Topic的某一Partition)中,并向Producer返回ACK信号,表示确认收到。

但是Producer和Broker之间的通信总有可能出现异常,如果消息已经写入,但ACK在半途丢失了,Producer就会再次发送该消息,造成重复。

在0.11版本之前,这个问题是无法靠Kafka本身解决的,所以只能得到at least once语义,下游要保证精确的话还得加上去重操作。而在0.11版本引入幂等性之后,只需要将Producer的enable.idempotence配置项设为true,就能保证消息就算重发也仅写入一次了。

究其原因,Kafka加入了以下两个标记值:

  • PID,在Producer初始化时分配,作为每个Producer会话的唯一标识;
  • 序列号(sequence number),Producer发送的每条消息(更准确地说是每一个消息批次,即ProducerBatch)都会带有此序列号,从0开始单调递增。Broker根据它来判断写入的消息是否可接受。

Broker会为每个TopicPartition组合维护PID和序列号。对每条接收到的消息,都会检查它的序列号是否比Broker所维护的值严格+1,只有这样才是合法的,其他情况都会丢弃。

如下图所示,加上PID和sequence number之后,Broker就会检测到有两条PID = 100且seq = 1的消息写入了Partition,并忽略掉重发的那一条,成功避免了重复。

Kafka的幂等性实现了对于单个Producer会话、单个TopicPartition级别的不重不漏,也就是最细粒度的保证。如果Producer重启(PID发生变化),或者写入是跨Topic、跨Partition的,单纯的幂等性就会失效,需要更高级别的事务性来解决了。当然事务性的原理更加复杂(需要专门的协调组件TransactionCoordinator做协调),下面不考虑事务性,看看幂等性在代码级别多了哪些实现。

Producer端的处理逻辑

首先列举出涉及到的各个组件。

  • KafkaProducer:即Producer实例;
  • Sender:KafkaProducer内置的发送消息到Broker的线程;
  • RecordAccumulator:消息批次ProducerBatch的累加器(缓存),当它满了就会唤醒Sender线程发送消息;
  • TransactionManager:如果启用了幂等性和/或事务性,Producer内的该组件就会记录PID、各个TopicPartition的序列号和事务状态等信息。

讲解Kafka消息生产全流程的文章数不胜数,本文就不再全部讲一遍,只是重点看看Producer一端和幂等性强相关的那些逻辑。

当客户端调用KafkaProducer.send()方法时,消息实际上是以批次形式(即ProducerBatch)存入了RecordAccumulator中,并且这些ProducerBatch都还没有PID和序列号标记。在Sender线程的run()方法中,会调用maybeWaitForProducerId()方法来为没有PID的Producer产生一个PID,其代码如下。

  private void maybeWaitForProducerId() {
    while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
      try {
        Node node = awaitLeastLoadedNodeReady(requestTimeout);
        if (node != null) {
          ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
          InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
          Errors error = initProducerIdResponse.error();
          if (error == Errors.NONE) {
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
              initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
            transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
            return;
          } else if (error.exception() instanceof RetriableException) {
            log.debug("Retriable error from InitProducerId response", error.message());
          } else {
            transactionManager.transitionToFatalError(error.exception());
            break;
          }
        } else {
          log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
            "We will back off and try again.");
        }
      } catch (UnsupportedVersionException e) {
        transactionManager.transitionToFatalError(e);
        break;
      } catch (IOException e) {
        log.debug("Broker {} disconnected while awaiting InitProducerId response", e);
      }
      log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
      time.sleep(retryBackoffMs);
      metadata.requestUpdate();
    }
  }

其流程是:调用awaitLeastLoadedNodeReady()方法获取到当前连接数最少的Broker节点,并调用sendAndAwaitInitProducerIdRequest()方法向该节点发送一个InitProducerIdRequest请求。如果响应正常,会返回两个值,一是PID,二是Producer的纪元值(epoch)。后者用于在事务性开启时判断当前Producer是否过期,与幂等性无关。最后调用TransactionManager.setProducerIdAndEpoch()方法,将PID和纪元值保存在TransactionManager实例中。如果未分配成功,会隔一段时间继续重试。

PID的产生由专门的ProducerIdManager组件来管理,并且是与ZooKeeper交互的。原理比较简单,源码就不贴了。

接下来,Sender.run()还会调用sendProducerData()方法正式取出RecordAccumulator中缓存的消息,最终包装成ProduceRequest,即生产消息的请求,并向Broker发送出去。以下是sendProducerData()调用的RecordAccumulator.drain()方法的代码,其中有不少与幂等性相关的逻辑。

  public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
                                                 Set<Node> nodes,
                                                 int maxSize,
                                                 long now) {
    if (nodes.isEmpty())
      return Collections.emptyMap();

    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
      int size = 0;
      List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
      List<ProducerBatch> ready = new ArrayList<>();
      int start = drainIndex = drainIndex % parts.size();
      do {
        PartitionInfo part = parts.get(drainIndex);
        TopicPartition tp = new TopicPartition(part.topic(), part.partition());
        if (!muted.contains(tp)) {
          Deque<ProducerBatch> deque = getDeque(tp);
          if (deque != null) {
            synchronized (deque) {
              ProducerBatch first = deque.peekFirst();
              if (first != null) {
                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                if (!backoff) {
                  if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                   break;
                  } else {
                    ProducerIdAndEpoch producerIdAndEpoch = null;
                    boolean isTransactional = false;
                    if (transactionManager != null) {
                      if (!transactionManager.isSendToPartitionAllowed(tp))
                        break;

                      producerIdAndEpoch = transactionManager.producerIdAndEpoch();
                      if (!producerIdAndEpoch.isValid())
                        break;

                      isTransactional = transactionManager.isTransactional();

                      if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))
                        break;

                      int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
                      if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
                        && first.baseSequence() != firstInFlightSequence)
                        break;
                    }

                    ProducerBatch batch = deque.pollFirst();
                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
                      batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                      transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                      log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                          "{} being sent to partition {}", producerIdAndEpoch.producerId,
                        producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                      transactionManager.addInFlightBatch(batch);
                    }
                    batch.close();
                    size += batch.records().sizeInBytes();
                    ready.add(batch);
                    batch.drained(now);
                  }
                }
              }
            }
          }
        }
        this.drainIndex = (this.drainIndex + 1) % parts.size();
      } while (start != drainIndex);
      batches.put(node.id(), ready);
    }
    return batches;
  }

在生产一个批次之前,drain()方法执行了很多重判断:

  • 该批次没有在等待重试的间隔中;
  • 该批次允许被发送到对应的TopicPartition;
  • PID和纪元值有效;
  • 该批次的前面没有未发送成功的批次。TransactionManager.hasUnresolvedSequence()用于判断之前的序列号对应的消息状态是否已确定(即有没有乱序风险);
  • 该TopicPartition没有数据在发送。如果有批次是in-flight的,并且它的序列号与本批次的不同,说明本批次是重试的,需要等待in-flight的数据发送完成。

如果通过了上面的判断,才会继续执行下去。若当前批次没有序列号,表示它是第一次发送。此时就会将PID、纪元值、序列号写入该ProducerBatch,并调用TransactionManager.incrementSequenceNumber()增加维护的序列号的值,最后将其标记为in-flight,即待发送。

Broker端的处理逻辑

简述一下Broker端涉及到的数据结构。

  • BatchMetadata:存储批次的元数据,包括该批次中最后一条消息的序列号、offset,以及消息的偏移量(条数)等信息。
  • ProducerIdEntry:以队列的形式维护单个PID对应的最新BatchMetadata,序列号小的在前,大的在后,并且容量固定为5。
  • ProducerStateManager:管理所有TopicPartition的ProducerIdEntry。

Producer端的ProduceRequest请求发出后,由Broker通过KafkaApis.handleProduceRequest()方法处理,最终落盘时,会来到Log.append()方法。来看看它是怎么校验PID和序列号的。

  private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
  (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = {
    val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
    val completedTxns = ListBuffer.empty[CompletedTxn]
    for (batch <- records.batches.asScala if batch.hasProducerId) {
      val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)

      if (isFromClient)
        maybeLastEntry.flatMap(_.duplicateOf(batch)).foreach { duplicate =>
          return (updatedProducers, completedTxns.toList, Some(duplicate))
        }

      val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient)
      maybeCompletedTxn.foreach(completedTxns += _)
    }
    (updatedProducers, completedTxns.toList, None)
  }

可见,如果这个批次有PID,就会从ProducerStateManager维护的对应BatchMetadata中寻找是否有重复的(亦即该批次是否为重复发送)。对应的实现如下。

  def duplicateOf(batch: RecordBatch): Option[BatchMetadata] = {
    if (batch.producerEpoch() != producerEpoch)
       None
    else
      batchWithSequenceRange(batch.baseSequence(), batch.lastSequence())
  }

  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
    val duplicate = batchMetadata.filter { case(metadata) =>
      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
    }
    duplicate.headOption
  }

如果此批次的第一条消息的序列号和最后一条消息的序列号和缓存中的完全相同,表示它是重复发送。当该批次不是重发时,才会继续调用updateProducers()方法更新BatchMetadata信息。

校验序列号的逻辑则位于ProducerAppendInfo.checkSequence()方法。

  private def checkSequence(producerEpoch: Short, firstSeq: Int, lastSeq: Int): Unit = {
    if (producerEpoch != currentEntry.producerEpoch) {
      if (firstSeq != 0) {
        if (currentEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " +
            s"(request epoch), $firstSeq (seq. number)")
        } else {
          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " +
            s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")
        }
      }
    } else if (currentEntry.lastSeq == RecordBatch.NO_SEQUENCE && firstSeq != 0) {
      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $firstSeq " +
        s"(incoming seq. number), but expected 0")
    } else if (isDuplicate(firstSeq, lastSeq)) {
      throw new DuplicateSequenceException(s"Duplicate sequence number for producerId $producerId: (incomingBatch.firstSeq, " +
        s"incomingBatch.lastSeq): ($firstSeq, $lastSeq).")
    } else if (!inSequence(firstSeq, lastSeq)) {
      throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $firstSeq " +
        s"(incoming seq. number), ${currentEntry.lastSeq} (current end sequence number)")
    }
  }

  private def isDuplicate(firstSeq: Int, lastSeq: Int): Boolean = {
    ((lastSeq != 0 && currentEntry.firstSeq != Int.MaxValue && lastSeq < currentEntry.firstSeq)
      || currentEntry.batchWithSequenceRange(firstSeq, lastSeq).isDefined)
  }

  private def inSequence(firstSeq: Int, lastSeq: Int): Boolean = {
    firstSeq == currentEntry.lastSeq + 1L || (firstSeq == 0 && currentEntry.lastSeq == Int.MaxValue)
  }

可见,如果Producer的纪元值发生过变化,那么写入的批次序列号一定要是0(因为Producer不再是原来的那个了)。当维护的最近一条序列号为-1时,表示此PID对应的Producer还未生产过消息,写入的批次序列号也必须是0。最后一个合法条件就是序列号是严格+1,当其达到整形最大值时,就回滚到0重新开始计。

特别注意,由于ProducerIdEntry只能固定缓存5个BatchMetadata,所以在开启幂等性时,Producer的max.in.flight.requests.per.connection参数不能设为大于5的值。显然,若设为大于5的话,有可能会造成某一批次的元数据被挤出缓存,如果该批次又发生重试,就会因为永远找不到其对应的BatchMetadata而达到最大重试次数,效率大大降低。

The End

写得还是比较乱,但是Kafka的细节实在太多,今天又是周末,还是不push自己了。

切西瓜去,民那晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349