Kafka的初始化启动流程
- 由KafkaServer::startup来负责;
- KafkaServer::startup主要是创建并启动各种Manager;
- 上图:
KafkaHealthcheck: core/src/main/scala/kafka/server/KafkaHealthcheck.scala,其作用是在broker info注册到zk的/brokers/id路径下, 且监听zk的session expiration事件,触发时重新注册;
上图中的各个启动的组件我们慢慢都会介绍到, 先从请求的接收与响应开始~~~
请求处理
- SocketServer: 负责处理网络连接, 数据的接收和发送, 其中的RequestChannel负责向应用层转递请求,也负责把应用层的response传回网络层后发送出去;
详细见:Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3 - KafkaRequestHandlerPool: 线程池, 每个线程里跑一个
KafkaRequestHandler
- KafkaRequestHandler: 循环调用
RequestChannel::receiveRequest
来poll到新的request交给KafkaApis
处理; - KafkaApis: 处理request的分发
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId