class Request(
//Processor 线程的序号,即这个请求是由哪个 Processor 线程接收处理的;
//当 Request 被后面的 I/O 线程处理完成后,还要依靠 Processor 线程发送 Response 给请求发送方,
//因此,Request 中必须记录它之前是被哪个 Processor 线程接收的
val processor: Int,
// context 是用来标识请求上下文信息,包括请求头信息、客户端的连接id、客户端地址、权鉴信等;
val context: RequestContext,
//startTimeNanos 记录了 Request 对象被创建的时间,主要用于各种时间统计指标的计算。
val startTimeNanos: Long,
//memoryPool 表示源码定义的一个非阻塞式的内存缓冲区,主要作用是避免 Request 对象无限使用内存。
memoryPool: MemoryPool,
//buffer 是真正保存 Request 对象内容的字节缓冲区。
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics。
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
def processor: Int = request.processor
def responseString: Option[String] = Some("")
def onComplete: Option[Send => Unit] = None
override def toString: String
- SendResponse:Kafka 大多数 Request 处理完成后都需要执行一段回调逻辑,SendResponse 就是保存返回结果的 Response 子类。里面最重要的字段是 onCompletionCallback,即指定处理完成之后的回调逻辑。
- NoResponse:有些 Request 处理完成后无需单独执行额外的回调逻辑。NoResponse 就是为这类 Response 准备的。
- CloseConnectionResponse:用于出错后需要关闭 TCP 连接的场景,此时返回 CloseConnectionResponse 给 Request 发送方,显式地通知它关闭连接。
- StartThrottlingResponse:用于通知 Broker 的 Socket Server 组件,某个 TCP 连接通信通道开始被限流(throttling)。
- EndThrottlingResponse:与 StartThrottlingResponse 对应,通知 Broker 的 SocketServer 组件某个 TCP 连接通信通道的限流已结束。
private[kafka] class Acceptor(
// 配置的Kafka Broker连接信息,比如 PLAINTEXT://localhost:9092。Acceptor需要用endPoint包含的主机名和端口信息创建Server Socket
val endPoint: EndPoint,
//SocketOptions 的 SO_SNDBUF,即用于设置出站(Outbound)网络 I/O 的底层缓冲区大小。该值默认是 Broker 端参数 socket.send.buffer.bytes 的值,即 100KB
val sendBufferSize: Int,
//SocketOptions 的 SO_RCVBUF,即用于设置入站(Inbound)网络 I/O 的底层缓冲区大小。该值默认是 Broker 端参数 socket.receive.buffer.bytes 的值,即 100KB。
val recvBufferSize: Int,
// broker 节点 id
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas)
private val nioSelector = NSelector.open()
// Broker端创建对应的ServerSocketChannel实例,同时将此连接注册到selector中,用于监听客户端的连接
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
private val processors = new ArrayBuffer[Processor]()
def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
// Assign the channel to the next processor (using round-robin) to which the
// channel can be added without blocking. If newConnections queue is full on
// all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
processor = synchronized {
// adjust the index (if necessary) and retrieve the processor atomically for
// correct behaviour in case the number of processors is reduced dynamically
currentProcessorIndex = currentProcessorIndex % processors.length
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
private[kafka] class Processor(
val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 这是一个临时 Response 队列。当 Processor 线程将 Response 返还给 Request 发送方之后,还要将 Response 放入这个临时队列。
// 有些 Response 回调逻辑要在 Response 被发送回发送方之后,才能执行,因此需要暂存在一个临时队列里面。
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
private val selector = createSelector( ChannelBuilders.serverChannelBuilder(listenerName,listenerName == config.interBrokerListenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache,time,logContext))
override def run(): Unit = {
try {
while (isRunning) {
try {
// 1. 处理新连接
// 遍历newConnections中的SocketChannel对象,在Selector中注册对应SocketChannel的OP_READ事件
// 2. 处理Response队列
// 遍历responseQueue队列,并根据不同类型的Response进行不同处理,如将Response发送给客户端,将Response入队到inflightResponses中,处理限流消息等
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
RequestChannel 是传输 Request/Response 的通道;RequestChannel的requestQueue 会缓存Request,同时processors线程池中的Processor会读取各个SocketChannel中的数据,封装为Request并入队到requestQueue 队列中;
class RequestChannel(
val queueSize: Int,
val metricNamePrefix : String) extends KafkaMetricsGroup {
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()、
2.5、KafkaRequestHandler 与 KafkaRequestHandlerPool说明
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {}
def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 从请求队列中获取下一个待处理的请求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 关闭线程请求,说明该 Broker 发起了关闭操作
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
// 普通请求
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 由KafkaApis.handle方法执行相应处理逻辑
} catch {
// 如果出现严重错误,立即关闭线程
case e: FatalExitError =>
case e: Throwable => error("Exception when handling request", e)
} finally {
// 释放请求对象占用的内存缓冲区资源
case null => // continue
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
class KafkaApis(val requestChannel: RequestChannel,// 请求通道
val replicaManager: ReplicaManager,// 副本管理器,控制集群所有副本的状态转换
val adminManager: AdminManager,// topic、分区配置等管理器
val groupCoordinator: GroupCoordinator,// 消费者组协调器组件
val txnCoordinator: TransactionCoordinator,// 事务管理器组件
val controller: KafkaController, // 控制器组件,管理与保存原数据
val zkClient: KafkaZkClient,// ZooKeeper客户端程序,Kafka依赖于该类实现与ZooKeeper交互
val brokerId: Int, // broker.id参数值
val config: KafkaConfig,// Kafka配置类
val metadataCache: MetadataCache,// 元数据缓存类
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,// 配额管理器组件
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,//节点的主题状态信息
val clusterId: String,//集群id
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {}
2.7、Data plane 与 Control plane说明
Kafka对数据类命令和控制类命令的连接处理通道进行了拆分,Data Plane处理数据类请求,Control plane处理命令类请求;
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
// SocketServer实现BrokerReconfigurable trait表明SocketServer的一些参数配置是允许动态修改的
// 即在Broker不停机的情况下修改它们
// SocketServer的请求队列长度,由Broker端参数queued.max.requests值而定,默认值是500
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
// 处理数据类请求的Acceptor线程池,每套监听器对应一个Acceptor线程
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
// control-plane
// 用于处理控制类请求的Processor线程
// 注意:目前定义了专属的Processor线程而非线程池处理控制类请求
// Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
// 一旦你开启了 Control plane 设置,其 Processor 线程就只有 1 个,Acceptor 线程也是 1 个。另外,你要注意,它对应的 RequestChannel 里面的请求队列长度被硬编码成了 20,而不是一个可配置的值。
// 这揭示了社区在这里所做的一个假设:即控制类请求的数量应该远远小于数据类请求,因而不需要为它创建线程池和较深的请求队列。
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
// 处理控制类请求专属的RequestChannel对象
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time))
// Data plane 和 Control plane 注释下面分别定义了一组变量,即 Processor 线程池、Acceptor 线程池和 RequestChannel 实例。
// Processor 线程池:即网络线程池,负责将请求高速地放入到请求队列中。
// Acceptor 线程池:保存了 SocketServer 为每个监听器定义的 Acceptor 线程,此线程负责分发该监听器上的入站连接建立请求。
// RequestChannel:承载请求队列的请求处理通道。
创建Data plane:
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = {
// 遍历监听器集合
endpoints.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 为监听器创建多个Processor线程。具体数目由num.network.threads决定
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 将<监听器,Acceptor线程>对保存起来统一管理
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
创建Control plane:
Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列
private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
// 如果为Control plane配置了监听器
endpointOpt.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
// 为监听器创建对应的Processor线程
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
listenerProcessors += controlPlaneProcessor
// 将Processor线程添加到控制类请求专属RequestChannel中
// 即添加到RequestChannel实例保存的Processor线程池中
nextProcessorId += 1
// 把Processor对象也添加到Acceptor线程管理的Processor线程池中
controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")