kafka源码愫读(2)、broker初始化流程分析

Kafka 集群由多个 broker 节点构成,每个节点上都运行着一个 Kafka 实例,这些实例之间基于 ZK 来发现彼此,并由集群控制器 KafkaController 统筹协调运行,彼此之间基于 socket 连接进行通信。

1、KafkaServer 类主要字段:

KafkaServer 为kafka的启动类,其中包含了kafka的所有组件,如KafkaController、groupCoordinator、replicaManager等;

class KafkaServer(val config: KafkaConfig, //配置信息
time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None,
                  kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报
                  ) extends Logging with KafkaMetricsGroup {
  //标识节点已经启动完成
  private val startupComplete = new AtomicBoolean(false)
  //标识节点正在执行关闭操作
  private val isShuttingDown = new AtomicBoolean(false)
  //标识节点正在执行启动操作
  private val isStartingUp = new AtomicBoolean(false)
  //阻塞主线程等待 KafkaServer 的关闭
  private var shutdownLatch = new CountDownLatch(1)

  private val jmxPrefix: String = "kafka.server"

  //日志上下文
  private var logContext: LogContext = null

  var metrics: Metrics = null
  //记录节点的当前状态
  val brokerState: BrokerState = new BrokerState
  //API接口类,用于处理数据类请求
  var dataPlaneRequestProcessor: KafkaApis = null
  //API接口,用于处理控制类请求
  var controlPlaneRequestProcessor: KafkaApis = null

  //权限管理
  var authorizer: Option[Authorizer] = None
  //启动socket,监听9092端口,等待接收客户端请求 
  var socketServer: SocketServer = null
  //数据类请求处理线程池
  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //命令类处理线程池
  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
  //日志管理器    
  var logDirFailureChannel: LogDirFailureChannel = null
  var logManager: LogManager = null

  //副本管理器
  var replicaManager: ReplicaManager = null
  
  //topic增删管理器
  var adminManager: AdminManager = null
  
  //token管理器
  var tokenManager: DelegationTokenManager = null

  //动态配置管理器
  var dynamicConfigHandlers: Map[String, ConfigHandler] = null
  var dynamicConfigManager: DynamicConfigManager = null
  var credentialProvider: CredentialProvider = null
  var tokenCache: DelegationTokenCache = null
  
  //分组协调器
  var groupCoordinator: GroupCoordinator = null

  var transactionCoordinator: TransactionCoordinator = null

  //集群控制器
  var kafkaController: KafkaController = null

  //定时任务调度器
  var kafkaScheduler: KafkaScheduler = null
  
  //集群分区状态信息缓存
  var metadataCache: MetadataCache = null
  
  //配额管理器
  var quotaManagers: QuotaFactory.QuotaManagers = null

  //zk客户端配置
  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
  private var _zkClient: KafkaZkClient = null
  val correlationId: AtomicInteger = new AtomicInteger(0)
  val brokerMetaPropsFile = "meta.properties"
  val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap

  private var _clusterId: String = null
  private var _brokerTopicStats: BrokerTopicStats = null

  def clusterId: String = _clusterId

  // Visible for testing
  private[kafka] def zkClient = _zkClient

  private[kafka] def brokerTopicStats = _brokerTopicStats
  }

2、服务启动过程

kafka的启动过程,主要是对当前节点各个模块进行初始化工作;

def startup(): Unit = {
  try {
    info("starting")

    if (isShuttingDown.get)
      throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

    if (startupComplete.get)
      return

    val canStartup = isStartingUp.compareAndSet(false, true)
    if (canStartup) {
      //初始化broker状态为starting  
      brokerState.newState(Starting)

      //初始化zk连接,并创建根节点
      initZkClient(time)

      //从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
      _clusterId = getOrGenerateClusterId(zkClient)
      info(s"Cluster ID = $clusterId")

      //获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
       //规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
      val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs

      /* check cluster id */
      if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId)
        throw new InconsistentClusterIdException(
          s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " +
          s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.")

      /* generate brokerId */
      config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint)
      logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
      this.logIdent = logContext.logPrefix

      // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
      // applied after DynamicConfigManager starts.
      config.dynamicConfig.initialize(zkClient)

      //初始化定时任务调度器
      kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
      kafkaScheduler.startup()

      //创建及配置监控,默认使用JMX及Yammer Metrics
      val reporters = new util.ArrayList[MetricsReporter]
      reporters.add(new JmxReporter(jmxPrefix))
      val metricConfig = KafkaServer.metricConfig(config)
      metrics = new Metrics(metricConfig, reporters, time, true)

      /* register broker metrics */
      _brokerTopicStats = new BrokerTopicStats
      
      //初始化配额管理器
      quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
      notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
      //用于保证kafka-log数据目录的存在
      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

      //启动日志管理器,kafka的消息以日志形式存储
      logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
      logManager.startup()

      metadataCache = new MetadataCache(config.brokerId)
      // SCRAM认证方式的token缓存enabled dynamically.
      tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
      credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

      //启动socket,监听9092端口,等待接收客户端请求 
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      socketServer.startup(startupProcessors = false)

      //启动副本管理器,高可用相关
      replicaManager = createReplicaManager(isShuttingDown)
      replicaManager.startup()
      
      //将broker信息注册到ZK上
      val brokerInfo = createBrokerInfo
      val brokerEpoch = zkClient.registerBroker(brokerInfo)

      //校验broker信息
      checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId)))

      //启动token管理器
      tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
      tokenManager.startup()

      //启动Kafka控制器,只有leader会与ZK建连
      kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix)
      kafkaController.startup()
      
      //admin管理器
      adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

      /* start group coordinator */
       //启动集群群组协调器
      groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
      groupCoordinator.startup()

     //启动事务协调器
      transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
      transactionCoordinator.startup()

      //ACL
      authorizer = config.authorizer
      authorizer.foreach(_.configure(config.originals))
      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
        case Some(authZ) =>
          authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.mapValues(_.toCompletableFuture).toMap
        case None =>
          brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap
      }

      //创建拉取管理器
      val fetchManager = new FetchManager(Time.SYSTEM,
        new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
          KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

      //初始化数据类请求的KafkaApis,负责数据类请求逻辑处理
      dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
        kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
        fetchManager, brokerTopicStats, clusterId, time, tokenManager)
        
      //初始化数据类请求处理的线程池  
      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
        config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

      //初始化控制类请求的KafkaApis
      socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
        controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
          kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
          fetchManager, brokerTopicStats, clusterId, time, tokenManager)
        
        //初始化控制类请求的线程池
        controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
          1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
      }

      Mx4jLoader.maybeLoad()

      //启动动态配置处理器
      config.dynamicConfig.addReconfigurables(this)

      /* start dynamic config manager */
      dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController),
                                                         ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                         ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

      // Create the config manager. start listening to notifications
      dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
      dynamicConfigManager.startup()

      //启动请求处理线程
      socketServer.startControlPlaneProcessor(authorizerFutures)
      socketServer.startDataPlaneProcessors(authorizerFutures)
      
      //更新broker状态
      brokerState.newState(RunningAsBroker)
      shutdownLatch = new CountDownLatch(1)
      startupComplete.set(true)
      isStartingUp.set(false)
      AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics, time.milliseconds())
      info("started")
    }
  }
  catch {
    case e: Throwable =>
      fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
      isStartingUp.set(false)
      shutdown()
      throw e
  }
}

3、broker节点状态

broker状态定义:

sealed trait BrokerStates { def state: Byte }
case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning :初始状态,标识当前 broker 节点未运行。
  • Starting :标识当前 broker 节点正在启动中。
  • RecoveringFromUncleanShutdown :标识当前 broker 节点正在从上次非正常关闭中恢复。
  • RunningAsBroker :标识当前 broker 节点启动成功,可以对外提供服务。
  • PendingControlledShutdown :标识当前 broker 节点正在等待 controlled shutdown 操作完成。
  • BrokerShuttingDown :标识当前 broker 节点正在执行 shutdown 操作。

节点状态转换图:

节点状态转换图.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容