Kafka的Broker下线一般有两种情况:
- 一种是Broker的进程被kill,例如可能Broker所在主机宕机或者故障;
- 一种就是正常停止Broker,通过发送ControlledShutdownRequest请求的方式,例如使用Kafka提供的脚本停止Broker或者其他引起ControlledShutdownRequest的场景。
在这两种情况下,如果某个topic的partition的leader恰好是这个下线的Broker,那么这个partition就要重新选举leader,这两种情况的选举算法是不同的,本文使用HDP-2.6.0.3基于kafka 0.10.x版本将阐述这两种选举机制的不同。
Partition的leader的5种选举机制
Partition选举leader有5种方式分别是ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector,NoOpLeaderSelector,OfflinePartitionLeaderSelector。本文只介绍ControlledShutdownLeaderSelector和OfflinePartitionLeaderSelector。
Broker的进程被kill
在ReplicaStateMachine中有一个BrokerChangeListener,负责监听ZooKeeper中/brokers/ids
下节点的变化,当某个broker被kill的时候会导致这个路径下相应的节点被删除,从而触发BrokerChangeListener
中的handleChildChange
方法,其中关键代码如下
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIdsSorted)
然后执行的是KafkaController.onBrokerFailure
,其中关键代码如下
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
从而进入PartitionStateMachine.handleStateChange
方法,这时候传入的选举策略就是controller.offlinePartitionSelector
,即OfflinePartitionLeaderSelector
handleStateChange(topicAndPartition.topic,topicAndPartition.partition, OnlinePartition,
controller.offlinePartitionSelector,
(new CallbackBuilder).build)
所以在这种情况下,partition选举leader的策略就是OfflinePartitionLeaderSelector。下面是源码中对这种策略的解释:
Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
- If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
isr as the new isr.- Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
- Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
- If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException.
Replicas to receive LeaderAndIsr request = live assigned replicas
Once the leader is successfully registered in zookeeper, it updates the allLeaders cache.
已经很清晰了,就不再翻译,而且代码逻辑不复杂,也可以直接阅读源码。这里需要强调一点,就是unclean.leader.election.enable
这个参数,简单说,如果ISR为空,那么当这个参数为true的时候,可以在AR(Assigned Replicas)列表中选择一个作为leader。
正常停止Broker
在调用KafkaServer.shutdown
方法的时候,会调用controlledShutdown
方法,而这个方法中会执行如下代码
// send the controlled shutdown request
val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
这个请求会触发KafkaApis中的
ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
在这个方法里面会继续调用
val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
而在KafkaController.shutdownBroker
中会调用如下方法,传入的选举leader的策略就是ControlledShutdownLeaderSelector。
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
下面是源码中对这种策略的解释:
New leader = replica in isr that's not being shutdown;
New isr = current isr - shutdown replica;
Replicas to receive LeaderAndIsr request = live assigned replicas
从这个解释或者源码中可以看到,这个策略比较简单,就是选择在ISR中没有下线的第一个Broker作为这partition的新leader。
验证
- 通过页面Ambari页面正常停止Broker 1001,前后的对比
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar PartitionCount:3 ReplicationFactor:3 Configs:
Topic: bar Partition: 0 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1003,1002,1001
Topic: bar Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1003,1001
Topic: bar Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar PartitionCount:3 ReplicationFactor:3 Configs:
Topic: bar Partition: 0 Leader: 1003 Replicas: 1001,1003,1002 Isr: 1003,1002
Topic: bar Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1003
Topic: bar Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002
可以在Kafka Controller日志中找到如下打印
DEBUG [ControlledShutdownLeaderSelector]: Partition [bar,0] : current leader = 1001, new leader = 1003 (kafka.controller.ControlledShutdownLeaderSelector)
- 通过kill Broker的进程停止Broker 1002,前后的对比
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar PartitionCount:3 ReplicationFactor:3 Configs:
Topic: bar Partition: 0 Leader: 1003 Replicas: 1001,1003,1002 Isr: 1003,1001
Topic: bar Partition: 1 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1003,1001
Topic: bar Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1001
/usr/hdp/2.6.0.3-8/kafka/bin/kafka-topics.sh --describe --zookeeper hostA:2181 --topic bar
Topic:bar PartitionCount:3 ReplicationFactor:3 Configs:
Topic: bar Partition: 0 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1003,1001
Topic: bar Partition: 1 Leader: 1001 Replicas: 1002,1001,1003 Isr: 1003,1001
Topic: bar Partition: 2 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1001
可以在Kafka Controller日志中找到如下打印
INFO [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":1001,"leader_epoch":3,"isr":[1003,1001]} for offline partition [bar,1] (kafka.controller.OfflinePartitionLeaderSelector)
总结
本文只是分析了在Broker下线的两种场景中,partition重新选举leader的两种机制。当然在实际生产环境中会遇到更多更复杂的情况,所以在遇到partition找不到leader的时候需要根据Controller日志分析当前场景下,使用的是哪种选举机制才能找到问题的根因。