1、SocketServer模块说明
SocketServer为kafka的网络通信管理模块,基于Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。
SocketServer网络通信流程图如下:
说明:
Acceptor:监听客户端连接,当有客户端或broker进行连接时,将新创建的客户端连接以轮询的方式交由某个Processor进行读写处理;
Processor:接收新建连接,注册读事件;处理客户端的Request;处理broker的Response;处理客户端断开连接事件;限制broker最大连接数等;
RequestChannel:Request请求处理通道,客户端的消息都会入队到RequestChannel.requestQueue中等待KafkaRequestHandler进行处理;
KafkaRequestHandler:请求Request处理线程,其从RequestChannel.requestQueue队列中读取Request并交由KafkaApis进行处理;
KafkaApis:协议处理类,调用ReplicaManager、LogManager、GroupManner等对请求进行处理,并将处理结果以Response的方式入队到Processor中,等待Processor将Response发送给客户端;
2、主要处理类说明
2.1、Request及Response说明
Request为客户端的请求,其源码如下:
class Request(
//Processor 线程的序号,即这个请求是由哪个 Processor 线程接收处理的;
//当 Request 被后面的 I/O 线程处理完成后,还要依靠 Processor 线程发送 Response 给请求发送方,
//因此,Request 中必须记录它之前是被哪个 Processor 线程接收的
val processor: Int,
// context 是用来标识请求上下文信息,包括请求头信息、客户端的连接id、客户端地址、权鉴信等;
val context: RequestContext,
//startTimeNanos 记录了 Request 对象被创建的时间,主要用于各种时间统计指标的计算。
val startTimeNanos: Long,
//memoryPool 表示源码定义的一个非阻塞式的内存缓冲区,主要作用是避免 Request 对象无限使用内存。
memoryPool: MemoryPool,
//buffer 是真正保存 Request 对象内容的字节缓冲区。
@volatile private var buffer: ByteBuffer,
metrics: RequestChannel.Metrics。
Response为broker处理请求后的应答,Response接口定义如下:
//Request为Response对应的请求
abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}
//Request对应的Processorid
def processor: Int = request.processor
def responseString: Option[String] = Some("")
//发送完成的回调
def onComplete: Option[Send => Unit] = None
override def toString: String
}
Response有5个具体的子类:
- SendResponse:Kafka 大多数 Request 处理完成后都需要执行一段回调逻辑,SendResponse 就是保存返回结果的 Response 子类。里面最重要的字段是 onCompletionCallback,即指定处理完成之后的回调逻辑。
- NoResponse:有些 Request 处理完成后无需单独执行额外的回调逻辑。NoResponse 就是为这类 Response 准备的。
- CloseConnectionResponse:用于出错后需要关闭 TCP 连接的场景,此时返回 CloseConnectionResponse 给 Request 发送方,显式地通知它关闭连接。
- StartThrottlingResponse:用于通知 Broker 的 Socket Server 组件,某个 TCP 连接通信通道开始被限流(throttling)。
- EndThrottlingResponse:与 StartThrottlingResponse 对应,通知 Broker 的 SocketServer 组件某个 TCP 连接通信通道的限流已结束。
2.2、Acceptor说明
Acceptor主要作用是监听客户端的连接,当有新连接建立后,其会将新的连接通过轮询交由某个Processor来进行管理。即Acceptor处理连接的ACCEPT事件,而READ和WRITE事件交由Processor来处理;同时,Acceptor也提供了几个接口来对Processor线程池进行管理。
Acceptor定义:
private[kafka] class Acceptor(
// 配置的Kafka Broker连接信息,比如 PLAINTEXT://localhost:9092。Acceptor需要用endPoint包含的主机名和端口信息创建Server Socket
val endPoint: EndPoint,
//SocketOptions 的 SO_SNDBUF,即用于设置出站(Outbound)网络 I/O 的底层缓冲区大小。该值默认是 Broker 端参数 socket.send.buffer.bytes 的值,即 100KB
val sendBufferSize: Int,
//SocketOptions 的 SO_RCVBUF,即用于设置入站(Inbound)网络 I/O 的底层缓冲区大小。该值默认是 Broker 端参数 socket.receive.buffer.bytes 的值,即 100KB。
val recvBufferSize: Int,
// broker 节点 id
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas)
Acceptor主要成员:
//Selector对象负责执行底层实际I/O操作,如监听连接创建请求、读写请求等
private val nioSelector = NSelector.open()
// Broker端创建对应的ServerSocketChannel实例,同时将此连接注册到selector中,用于监听客户端的连接
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
//Processor线程数组,用于处理Request/Response等
private val processors = new ArrayBuffer[Processor]()
Acceptor轮询流程:
处理源码如下:
def run(): Unit = {
//将ServerSocket的ACCEPT事件注册到对应的Selector中
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
//监听selector的事件,超时时间为500ms
val ready = nioSelector.select(500)
if (ready > 0) {
//获取所有事件的SelectionKey
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
//遍历SelectionKey
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
//接受客户端连接
accept(key).foreach { socketChannel =>
// Assign the channel to the next processor (using round-robin) to which the
// channel can be added without blocking. If newConnections queue is full on
// all processors, block until the last one is able to accept a connection.
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
//获取可用的Processor线程
processor = synchronized {
// adjust the index (if necessary) and retrieve the processor atomically for
// correct behaviour in case the number of processors is reduced dynamically
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
//将新连接放入对应Processor的newConnections队列中,同时唤醒Processor的Selector
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
2.3、Processor说明
Processor为协议处理线程,其监听客户端的读写事件,处理注册新连接,读取客户端请求及发送客户端的应答等,是真正进行协议处理的线程;
2.3.1、主要对象
private[kafka] class Processor(
val id: Int,
time: Time,
maxRequestSize: Int,
//用于存放Request,用于存放Request
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
failedAuthenticationDelayMs: Int,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider,
memoryPool: MemoryPool,
logContext: LogContext,
connectionQueueSize: Int = ConnectionQueueSize) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
//存放新建连接的队列,当Acceptor接受新连接时,会将新的ScketServer放入此队列中,
//而Processor会将此队列中的SocketServer加入Selector中,并监听对应的读或写事件
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
// 这是一个临时 Response 队列。当 Processor 线程将 Response 返还给 Request 发送方之后,还要将 Response 放入这个临时队列。
// 有些 Response 回调逻辑要在 Response 被发送回发送方之后,才能执行,因此需要暂存在一个临时队列里面。
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
//Response队列,此队列中的Response需要发送给其对应的客户端;
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
//Processor对应的Selector,用于监听客户端的读写事件
private val selector = createSelector( ChannelBuilders.serverChannelBuilder(listenerName,listenerName == config.interBrokerListenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache,time,logContext))
}
2.3.2、主要流程说明
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// 1. 处理新连接
// 遍历newConnections中的SocketChannel对象,在Selector中注册对应SocketChannel的OP_READ事件
configureNewConnections()
// 2. 处理Response队列
// 遍历responseQueue队列,并根据不同类型的Response进行不同处理,如将Response发送给客户端,将Response入队到inflightResponses中,处理限流消息等
processNewResponses()
//3、处理底层IO事件
//通过Selector获取底层IO事件,读取客户端的请求数据,并将将数据拼接完整并将数据入队completedReceives
poll()
//4、解析数据及封装Request
//遍历completedReceives,并根据协议将数据封装为Request消息,并将Request入队到RequestChannel的requestQueue中
processCompletedReceives()
//5、处理发送完成的Response
//遍历Selector中的completedSends队列,并从inflightResponses中移除对应的Response,同时调用对应Response的回调,及处理限流等
processCompletedSends()
//6、处理客户端断开连接
//遍历Selector中的disconnected队列,移除inflightResponses中对应连接的Response;
processDisconnected()
//7、最大连接数限制
//当一个listener的连接数大于配置的最大连接数时,按将要关闭的连接、LRU算法空闲超时的连接、任意一个连接的顺序关闭一个连接,以此达到资源释放
closeExcessConnections()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}
2.4、RequestChannel说明
RequestChannel 是传输 Request/Response 的通道;RequestChannel的requestQueue 会缓存Request,同时processors线程池中的Processor会读取各个SocketChannel中的数据,封装为Request并入队到requestQueue 队列中;
RequestChannel主要参数:
class RequestChannel(
//requestQueue的最大容量
val queueSize: Int,
val metricNamePrefix : String) extends KafkaMetricsGroup {
//保存Request的队列
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
//Processor线程池
private val processors = new ConcurrentHashMap[Int, Processor]()、
}
2.5、KafkaRequestHandler 与 KafkaRequestHandlerPool说明
KafkaRequestHandler为真正处理Request的线程,其主要是从RequestChannel中获取Request,并将Request交由KafkaApis进行处理;而KafkaRequestHandlerPool是KafkaRequestHandler的池化封装;
KafkaRequestHandler参数说明:
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
//线程池总体线程数量
val totalHandlerThreads: AtomicInteger,
//当前线程对应的RequestChannel
val requestChannel: RequestChannel,
//处理Request的apis
apis: KafkaApis,
time: Time) extends Runnable with Logging {}
KafkaRequestHandler处理流程:
def run(): Unit = {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
// 从请求队列中获取下一个待处理的请求
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
req match {
// 关闭线程请求,说明该 Broker 发起了关闭操作
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return
// 普通请求
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
// 由KafkaApis.handle方法执行相应处理逻辑
apis.handle(request)
} catch {
// 如果出现严重错误,立即关闭线程
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
// 释放请求对象占用的内存缓冲区资源
request.releaseBuffer()
}
case null => // continue
}
}
shutdownComplete.countDown()
}
KafkaRequestHandlerPool主要参数:
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
//线程个数
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
//线程池中线程的个数
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
//线程池,每个线程就是一个KafkaRequestHandler
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}
2.6、KafkaApis说明
KafkaApis封装了实际协议的处理逻辑,其中会根据不同的协议,调用LogManager、ReplicaManager等进行协议处理,同时将处理结果封装为Response入队到Processor中的responseQueue中;
KafkaApis主要参数:
class KafkaApis(val requestChannel: RequestChannel,// 请求通道
val replicaManager: ReplicaManager,// 副本管理器,控制集群所有副本的状态转换
val adminManager: AdminManager,// topic、分区配置等管理器
val groupCoordinator: GroupCoordinator,// 消费者组协调器组件
val txnCoordinator: TransactionCoordinator,// 事务管理器组件
val controller: KafkaController, // 控制器组件,管理与保存原数据
val zkClient: KafkaZkClient,// ZooKeeper客户端程序,Kafka依赖于该类实现与ZooKeeper交互
val brokerId: Int, // broker.id参数值
val config: KafkaConfig,// Kafka配置类
val metadataCache: MetadataCache,// 元数据缓存类
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,// 配额管理器组件
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,//节点的主题状态信息
val clusterId: String,//集群id
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {}
KafkaApis根据不同的协议进行对应的处理;
2.7、Data plane 与 Control plane说明
Kafka对数据类命令和控制类命令的连接处理通道进行了拆分,Data Plane处理数据类请求,Control plane处理命令类请求;
class SocketServer(val config: KafkaConfig,
val metrics: Metrics,
val time: Time,
val credentialProvider: CredentialProvider)
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
// SocketServer实现BrokerReconfigurable trait表明SocketServer的一些参数配置是允许动态修改的
// 即在Broker不停机的情况下修改它们
// SocketServer的请求队列长度,由Broker端参数queued.max.requests值而定,默认值是500
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", MetricsGroup)
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
// 处理数据类请求的Acceptor线程池,每套监听器对应一个Acceptor线程
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
// control-plane
// 用于处理控制类请求的Processor线程
// 注意:目前定义了专属的Processor线程而非线程池处理控制类请求
// Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列而已。
// 一旦你开启了 Control plane 设置,其 Processor 线程就只有 1 个,Acceptor 线程也是 1 个。另外,你要注意,它对应的 RequestChannel 里面的请求队列长度被硬编码成了 20,而不是一个可配置的值。
// 这揭示了社区在这里所做的一个假设:即控制类请求的数量应该远远小于数据类请求,因而不需要为它创建线程池和较深的请求队列。
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
// 处理控制类请求专属的RequestChannel对象
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time))
// Data plane 和 Control plane 注释下面分别定义了一组变量,即 Processor 线程池、Acceptor 线程池和 RequestChannel 实例。
// Processor 线程池:即网络线程池,负责将请求高速地放入到请求队列中。
// Acceptor 线程池:保存了 SocketServer 为每个监听器定义的 Acceptor 线程,此线程负责分发该监听器上的入站连接建立请求。
// RequestChannel:承载请求队列的请求处理通道。
}
SocketServer在初始化的时候,对两种通道进行区分;
创建Data plane:
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = {
// 遍历监听器集合
endpoints.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 为监听器创建多个Processor线程。具体数目由num.network.threads决定
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 将<监听器,Acceptor线程>对保存起来统一管理
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
}
}
创建Control plane:
Control plane 的配套资源只有 1 个 Acceptor 线程 + 1 个 Processor 线程 + 1 个深度是 20 的请求队列
private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
// 如果为Control plane配置了监听器
endpointOpt.foreach { endpoint =>
// 将监听器纳入到连接配额管理之下
connectionQuotas.addListener(config, endpoint.listenerName)
// 为监听器创建对应的Acceptor线程
val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
// 为监听器创建对应的Processor线程
val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
controlPlaneProcessorOpt = Some(controlPlaneProcessor)
val listenerProcessors = new ArrayBuffer[Processor]()
listenerProcessors += controlPlaneProcessor
// 将Processor线程添加到控制类请求专属RequestChannel中
// 即添加到RequestChannel实例保存的Processor线程池中
controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
nextProcessorId += 1
// 把Processor对象也添加到Acceptor线程管理的Processor线程池中
controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}
}