【Kafka源码】KafkaController启动过程

[TOC]


之前聊过了很多Kafka启动过程中的一些加载内容,也知道了broker可以分为很多的partition,每个partition内部也可以分为leader和follower,主从之间有数据的复制。那么这么多partition是谁在管理?broker内部有没有主从之分?这就是本文的主角,KafkaController,本文将细细道来。

一、入口

KafkaController的启动入口同样很简洁,在KafkaServer的start方法中。

/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()

首先实例化一个KafkaController,之后启动了这个controller。

二、实例化Controller

实例化的源码,见注释:

this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
//实例化上下文
val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs)
//实例化partition状态机
val partitionStateMachine = new PartitionStateMachine(this)
//实例化replica状态机
val replicaStateMachine = new ReplicaStateMachine(this)
//实例化broker的leader选举器
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
//实例化负载均衡定时器
private val autoRebalanceScheduler = new KafkaScheduler(1)
//topic删除管理器
var deleteTopicManager: TopicDeletionManager = null
//离线分区leader选择器
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
//重新分配分区leader
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
//重新分配leader时优先选择的replica
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
//controller关闭后的leader选举
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)

private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)

//重分配监听器
private val partitionReassignedListener = new PartitionsReassignedListener(this)
//优选replica选举监听器
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
//isr变化通知监听器
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)

三、Controller启动

直接上代码:

def startup() = {
    inLock(controllerContext.controllerLock) {
        info("Controller starting up")
        registerSessionExpirationListener()
        isRunning = true
        controllerElector.startup
        info("Controller startup complete")
    }
}

这个start方法并不意味着当前的broker就是controller,只是把它注册到zk上面,后面zk会进行选举,选举出controller后,在controller机器上面会执行一系列的操作,后面我们能看到。

3.1 registerSessionExpirationListener

首先,我们的broker会注册一个session过期的监听器,我们看一下这个监听器。

private def registerSessionExpirationListener() = {
    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
}
    
class SessionExpirationListener() extends IZkStateListener with Logging {
    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "

    @throws(classOf[Exception])
    def handleStateChanged(state: KeeperState) {
        // do nothing, since zkclient will do reconnect for us.
    }
    /**
    * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
    * any ephemeral nodes here.
    *
    * @throws Exception
    * On any error.
    */
    @throws(classOf[Exception])
    def handleNewSession() {
        info("ZK expired; shut down all controller components and try to re-elect")
        inLock(controllerContext.controllerLock) {
            onControllerResignation()
            controllerElector.elect
        }
    }

    override def handleSessionEstablishmentError(error: Throwable): Unit = {
        //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
    }
}

可以看到,当broker到zk的session失效之后,broker并不会主动发起重连操作,而是等待zk的重连,当新的session被创建后,也就是当前broker加入到broker列表中之后,会进行两个操作:

  • onControllerResignation:也就是当前controller失效
  • controllerElector.elect:重新进行controller选举

下面我们分别看看做了啥。

3.1.1 onControllerResignation

从代码看会比较直观,主要就是清理一些controller的数据。

/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
def onControllerResignation() {
    debug("Controller resigning, broker id %d".format(config.brokerId))
    // de-register listeners 取消订阅监听器
    deregisterIsrChangeNotificationListener()
    deregisterReassignedPartitionsListener()
    deregisterPreferredReplicaElectionListener()

    // shutdown delete topic manager 关闭topic删除管理器
    if (deleteTopicManager != null)
        deleteTopicManager.shutdown()

    // shutdown leader rebalance scheduler 关闭负载均衡定时器
    if (config.autoLeaderRebalanceEnable)
        autoRebalanceScheduler.shutdown()

    inLock(controllerContext.controllerLock) {
        // de-register partition ISR listener for on-going partition reassignment task 取消订阅ISR监听器
        deregisterReassignedPartitionsIsrChangeListeners()
        // shutdown partition state machine 关闭分区状态机
        partitionStateMachine.shutdown()
        // shutdown replica state machine 关闭replica状态机
        replicaStateMachine.shutdown()
        // shutdown controller channel manager 关闭控制器管道管理器
        if (controllerContext.controllerChannelManager != null) {
            controllerContext.controllerChannelManager.shutdown()
            controllerContext.controllerChannelManager = null
        }
        // reset controller context
        controllerContext.epoch = 0
        controllerContext.epochZkVersion = 0
        brokerState.newState(RunningAsBroker)//把当前broker状态从controller改为broker

        info("Broker %d resigned as the controller".format(config.brokerId))
    }
}

3.1.2 controllerElector.elect

这块是进行controller的重新选举。

def elect: Boolean = {
  val timestamp = SystemTime.milliseconds.toString
  val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
   
 leaderId = getControllerID 
  /* 
   * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
   * it's possible that the controller has already been elected when we get here. This check will prevent the following 
   * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
  */
if(leaderId != -1) {
   debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
   return amILeader
}

try {
  val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                  electString,
                        controllerContext.zkUtils.zkConnection.getZookeeper,
                                          JaasUtils.isZkSecurityEnabled())
    zkCheckedEphemeral.create()
    info(brokerId + " successfully elected as leader")
    leaderId = brokerId
    onBecomingLeader()
} catch {
    case e: ZkNodeExistsException =>
      // If someone else has written the path, then
      leaderId = getControllerID 

    if (leaderId != -1)
      debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
    else
      warn("A leader has been elected but just resigned, this will result in another round of election")

    case e2: Throwable =>
      error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
    resign()
}
amILeader
}

这块主要进行的是controller的选举,我们着重看下当前broker被选为controller之后的动作,也就是onBecomingLeader。这块就需要我们返回到实例化中去看下,这个动作是:onControllerFailover。

def onControllerFailover() {
    if (isRunning) {
        info("Broker %d starting become controller state transition".format(config.brokerId))
        //read controller epoch from zk
        readControllerEpochFromZookeeper()
        // increment the controller epoch
        incrementControllerEpoch(zkUtils.zkClient)
        // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
        registerReassignedPartitionsListener()
        registerIsrChangeNotificationListener()
        registerPreferredReplicaElectionListener()
        partitionStateMachine.registerListeners()
        replicaStateMachine.registerListeners()
        initializeControllerContext()
        replicaStateMachine.startup()
        partitionStateMachine.startup()
        // register the partition change listeners for all existing topics on failover
        controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
        info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
        brokerState.newState(RunningAsController)
        maybeTriggerPartitionReassignment()
        maybeTriggerPreferredReplicaElection()
        /* send partition leadership info to all live brokers */
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
        if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
                5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
        }
        deleteTopicManager.start()
    }
    else
        info("Controller has been shut down, aborting startup/failover")
}

这里面执行的动作很多,我们一一分析。

  • 首先从zk中读取controller的epoch
  • 然后将epoch+1后更新到zk中
  • 注册一系列监听器
  • 初始化controller上下文
  • 启动两个状态机
  • 订阅所有topic的分区变化监听器
  • 定时检查触发分区选举
  • 启动topic删除管理器

这里面的东西比较多,我们后面文章再分析。

3.2 controllerElector.startup

def startup {
  inLock(controllerContext.controllerLock) {
    controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
    elect
  }
}

这里的electionPath是/controller,下面我们看下这个leaderChangeListener。

3.2.1 leaderChangeListener

class LeaderChangeListener extends IZkDataListener with Logging {
   /**
    * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
    * @throws Exception On any error.
    */
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
  inLock(controllerContext.controllerLock) {
    val amILeaderBeforeDataChange = amILeader
    leaderId = KafkaController.parseControllerId(data.toString)
    info("New leader is %d".format(leaderId))
    // The old leader needs to resign leadership if it is no longer the leader
    if (amILeaderBeforeDataChange && !amILeader)
      onResigningAsLeader()
  }
}

/**
 * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
 * @throws Exception
 *             On any error.
 */
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
  inLock(controllerContext.controllerLock) {
    debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
      .format(brokerId, dataPath))
    if(amILeader)
      onResigningAsLeader()
    elect
  }
}
}

监听对应的zk节点,如果节点发生了变化,调用handleDataChange方法,主要内容是获取当前的leaderId。如果当前broker之前是leader,而新的leader不是自己,那么就会调用onResigningAsLeader方法,清除之前的leader信息。

如果节点被删除了,就会调用handleDataDeleted方法。如果当前broker是leader,会首先调用onResigningAsLeader方法,然后发起新的leader选举。

3.2.2 elect

这边就是我们的controller即leader选举方法。与3.1.2的内容一致。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,699评论 13 425
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,424评论 0 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,291评论 1 15
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,339评论 0 9
  • 定了个小目标,2个月减重10斤,找了两女票做见证人,要是没完成任务每人各赏500元。 既然客官介么有诚意,那买一送...
    OMG_Rachel阅读 59评论 0 0