Kafka Partition重分配流程简析

节日快乐~

今天是属于广大程序员的节日,祝自己快乐hhhhhh

随着业务量的急速膨胀和又一年双11的到来,我们会对现有的Kafka集群进行扩容,以应对更大的流量和业务尖峰。当然,扩容之后的新Kafka Broker默认是不会有任何Topic和Partition的,需要手动利用分区重分配命令kafka-reassign-partitions将现有的Partition/Replica平衡到新的Broker上去。那么Kafka具体是如何执行重分配流程的呢?本文就来简单解读一下。

生成、提交重分配方案

我们知道,使用kafka-reassign-partitions命令分为三步,一是根据指定的Topic生成JSON格式的重分配方案(--generate),二是将生成的方案提交执行(--execute),三是观察重分配的进度(--verify),它们分别对应kafka.admin.ReassignPartitionsCommand类中的generateAssignment()、executeAssignment()和verifyAssignment()方法。

generateAssignment()方法会调用AdminUtils#assignReplicasToBrokers()方法生成Replica分配方案。源码就不再读了,其原则简述如下:

  • 将Replica尽量均匀地分配到各个Broker上去;
  • 一个Partition的所有Replica必须位于不同的Broker上;
  • 如果Broker有机架感知(rack aware)的信息,将Partition的Replica尽量分配到不同的机架。

executeReassignment()方法调用了reassignPartitions()方法,其源码如下。

def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = {
  maybeThrottle(throttle)
  try {
    val validPartitions = proposedPartitionAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
    if (validPartitions.isEmpty) false
    else {
      if (proposedReplicaAssignment.nonEmpty) {
        val adminClient = adminClientOpt.getOrElse(
          throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory"))
        val alterReplicaDirResult = adminClient.alterReplicaLogDirs(
          proposedReplicaAssignment.asJava, new AlterReplicaLogDirsOptions().timeoutMs(timeoutMs.toInt))
        alterReplicaDirResult.values().asScala.foreach { case (replica, future) => {
            try {
              future.get()
              throw new AdminCommandFailedException(s"Partition ${replica.topic()}-${replica.partition()} already exists on broker ${replica.brokerId()}." +
                s" Reassign replica to another log directory on the same broker is currently not supported.")
            } catch {
              case t: ExecutionException =>
                t.getCause match {
                  case e: ReplicaNotAvailableException => // It is OK if the replica is not available
                  case e: Throwable => throw e
                }
            }
        }}
      }
      val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
      zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
      true
    }
  } catch {
    // ......
  }
}

在进行必要的Partition校验之后,创建ZK持久节点/admin/reassign_partitions,并将JSON格式的重分配方案写进去。如果该节点存在,就表示已经在进行重分配,不能再启动新的重分配流程(相关的判断在executeReassignment()方法中)。

监听并处理重分配事件

在之前讲解Kafka Controller时,笔者提到Controller会注册多个ZK监听器,将监听到的事件投递到内部的事件队列,并由事件处理线程负责处理。监听ZK中/admin/reassign_partitions节点的监听器为PartitionReassignmentListener,并产生PartitionReassignment事件,处理逻辑如下。

case object PartitionReassignment extends ControllerEvent {
  def state = ControllerState.PartitionReassignment

  override def process(): Unit = {
    if (!isActive) return
    val partitionReassignment = zkUtils.getPartitionsBeingReassigned()
    val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
    partitionsToBeReassigned.foreach { case (partition, context) =>
      if(topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) {
        error(s"Skipping reassignment of partition $partition since it is currently being deleted")
        removePartitionFromReassignedPartitions(partition)
      } else {
        initiateReassignReplicasForTopicPartition(partition, context)
      }
    }
  }
}

该方法先取得需要重分配的Partition列表,然后从中剔除掉那些已经被标记为删除的Topic所属的Partition,再调用initiateReassignReplicasForTopicPartition()方法:

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                              reassignedPartitionContext: ReassignedPartitionsContext) {
  val newReplicas = reassignedPartitionContext.newReplicas
  val topic = topicAndPartition.topic
  val partition = topicAndPartition.partition
  try {
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    assignedReplicasOpt match {
      case Some(assignedReplicas) =>
        if (assignedReplicas == newReplicas) {
          throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
            " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
        } else {
          info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
          // first register ISR change listener
          watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
          controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
          // mark topic ineligible for deletion for the partitions being reassigned
          topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
          onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
        }
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
        .format(topicAndPartition))
    }
  } catch {
    case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
    // remove the partition from the admin path to unblock the admin client
    removePartitionFromReassignedPartitions(topicAndPartition)
  }
}

该方法的执行逻辑如下:

  1. 判断Partition的原有Replica是否与即将重分配的新Replica相同,如果相同则抛出异常;
  2. 注册即将被重分配的Partition的ISR变化监听器;
  3. 把即将被重分配的Partition/Replica记录在Controller上下文中的partitionsBeingReassigned集合中;
  4. 把即将被重分配的Topic标记为不可删除;
  5. 调用onPartitionReassignment()方法真正触发重分配流程。

执行重分配流程

onPartitionReassignment()方法的代码如下。

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  val reassignedReplicas = reassignedPartitionContext.newReplicas
  if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
    info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassigned not yet caught up with the leader")
    val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
    val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
    //1. Update AR in ZK with OAR + RAR.
    updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
    //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
    updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
      newAndOldReplicas.toSeq)
    //3. replicas in RAR - OAR -> NewReplica
    startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
    info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassigned to catch up with the leader")
  } else {
    //4. Wait until all replicas in RAR are in sync with the leader.
    val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
    //5. replicas in RAR -> OnlineReplica
    reassignedReplicas.foreach { replica =>
      replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
        replica)), OnlineReplica)
    }
    //6. Set AR to RAR in memory.
    //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
    //   a new AR (using RAR) and same isr to every broker in RAR
    moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
    //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
    //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
    stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
    //10. Update AR in ZK with RAR.
    updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
    removePartitionFromReassignedPartitions(topicAndPartition)
    info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
    // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
    topicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
  }
}

官方JavaDoc比较详细,给出了3个方便解释流程的定义,列举如下:

  • RAR(Re-assigned replicas):重分配的Replica集合,记为reassignedReplicas;
  • OAR(Original assigned replicas):重分配之前的原始Replica集合,通过controllerContext.partitionReplicaAssignment()方法取得;
  • AR(Assigned replicas):当前的Replica集合,随着重分配的进行不断变化。

根据上文的代码和注释,我们可以很容易地梳理出重分配的具体流程:

(0) 检查RAR是否都已经在Partition的ISR集合中(即是否已经同步),若否,则计算RAR与OAR的差集,也就是需要被创建或者重分配的Replica集合;

(1) 计算RAR和OAR的并集,即所有Replica的集合,并将ZK中的AR更新;

(2) 增加Partition的Leader纪元值,并向AR中的所有Replica所在的Broker发送LeaderAndIsrRequest;

(3) 更新RAR与OAR的差集中Replica的状态为NewReplica,以触发这些Replica的创建或同步;

(4) 计算OAR和RAR的差集,即重分配过程中需要被下线的Replica集合;

(5) 等待RAR都已经在Partition的ISR集合中,将RAR中Replica的状态设置为OnlineReplica,表示同步完成;

(6) 将迁移现场的AR更新为RAR;

(7) 检查Partition的Leader是否在RAR中,如果没有,则触发新的Leader选举。然后增加Partition的Leader纪元值,发送LeaderAndIsrRequest更新Leader的结果;

(8~9) 将OAR和RAR的差集中的Replica状态设为Offline->NonExistentReplica,这些Replica后续将被删除;

(10) 将ZK中的AR集合更新为RAR;

(11) 一个Partition重分配完成,更新/admin/reassign_partitions节点中的执行计划,删掉完成的Partition;

(12) 发送UpdateMetadataRequest给所有Broker,刷新元数据缓存;

(13) 如果有一个Topic已经重分配完成并且将被删除,就将它从不可删除的Topic集合中移除。

The End

最后一个小问题:Partition重分配往往会涉及大量的数据交换,有可能会影响正常业务的运行,如何避免呢?ReassignPartitionsCommand也提供了throttle功能用于限流,在代码和帮助文档中都可以看到它,就不多讲了。当然,一旦启用了throttle,我们一定要定期进行verify操作,防止因为限流导致重分配的Replica一直追不上Leader的情况发生。

民那晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351