弄清楚kafka的网络模型原理,能很好的帮助理解和优化kafka服务。kafka底层的网络通信,没有使用第三方rpc实现,如netty等,而是使用了java的NIO实现的一套自己的通信框架协议。本文主要描述kafka基础网络通信的相关实现原理,版本为1.1.0。
java NIO具体细节不再描述,主要包含3个部分:
- Channel:连接,如FileChannel、SocketChannel等,表示连接通道,阻塞和非阻塞(FileChannel不支持)模式
- Buffer:缓存,可在读写两种模式中切换
- Selector:选择器,可实现一个线程处理多个连接
kafka实现网络模型主要涉及到3个类:
-
SocketServer
:实现监听listeners
,并构建Acceptor
、Processor
、ConnectionQuotas
等类,用于接收、处理、解析request和response。此处重要监控指标:NetworkProcessorAvgIdlePercent
、RequestQueueSize
、ResponseQueueSize
-
KafkaApis
:负责处理broker支持的各种通信协议,如PRODUCE/FETCH/LIST_OFFSETS/LEADER_AND_ISR/HEARTBEAT
等 -
KafkaRequestHandlerPool
:负责接收消息,处理SocketServer
接收的请求,并构建response返回给SocketServer
。此处重要监控指标:RequestHandlerAvgIdlePercent
通信概况
kafka接收消息、处理并返回,主要有以下步骤:
Acceptor:监听
OP_ACCEPT
,将连接的channel以round-robin的方式选择processor进行处理-
Processor:监听连接的
OP_READ
和OP_WRITE
,主要负责读写和解析数据- 读取客户端消息解析后放入
requestQueue
,解析头部信息 - 将
responseQueue
中的消息发送回客户端
- 读取客户端消息解析后放入
KafkaRequestHandler
:从requestQueue
中获取连接,根据头部信息获取对应的KafkaApi
协议进行相关处理,并通过回调,将处理后的结果通过RequestChannel
写到对应Processor
的responseQueue
中,等待Processor
线程处理
通信处理详细步骤
如图所示,其中,蓝色线表示请求处理及流向,绿色组件表示消息缓存。
-
SocketServer
中,主要功能有:- 通过配置的
listeners
可以监听多个interface/port,构建对应的Acceptor
- 每个
Acceptor
构建num.network.threads
个Processor
,用于处理连接请求 - 构建
RequestChannel
,保存解析好的连接请求
# 主要逻辑 private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { ... for (_ <- 0 until newProcessorsPerListener) { val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool) listenerProcessors += processor requestChannel.addProcessor(processor) nextProcessorId += 1 } listenerProcessors.foreach(p => processors.put(p.id, p)) acceptor.addProcessors(listenerProcessors) }
- 通过配置的
-
Acceptor
为线程,主要功能有:- 监听
OP_ACCEPT
事件,不断循环获取已经accept的连接 - 判断每个ip的连接数是否超过quota限制
- 通过round-robin的方式,选择
Processor
,放入Processor
对应的newConnections
缓存中 - 代码逻辑主要看
run()
方法
- 监听
-
Processor
为线程,主要功能为缓存新建连接、接收并缓存数据、缓存返回信息、处理断开连接等-
configureNewConnections()
: 处理新建的连接,监听OP_READ
事件,等待读取数据 -
poll()
- 真正读取数据,并放入接收缓存队列
stagedReceives
,缓存所有channel的请求 - 拿出每个channel的第一个请求,解析协议头部,放入
completedReceives
缓存中 - 如果channel写出ready,则进行write,将response返回给客户端
- 真正读取数据,并放入接收缓存队列
-
processCompletedReceives()
:将请求解析为Request
并放入requestQueue
缓存
# 主要代码逻辑 override def run() { startupComplete() try { while (isRunning) { try { // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() ...
-
-
KafkaRequestHandlerPool
初始化num.io.threads
个KafkaRequestHandler
- 从
requestQueue
中拿去request,并根据协议头,选择对应的KafkaApi
进行处理 - 使用回调,将处理完成的response通过
KafkaChannel
放入当前channel对应的Processor
的responseQueue
中
- 从
由此可见,kafka内部通信使用了NIO+缓存+异步,从而极大提升了kafka的单机并发能力。
至此,整个网络处理大概步骤完成,其中很多细节内容,这里不再赘述,感兴趣的同学可以看源码或一起交流。
通信协议
kafka接收到的数据内容格式如上图所示:
- 4B: 整个数据的长度,最大为
2^31 - 1
,超过socket.request.max.bytes
则报错,默认是100M - Header
- 2B: api_key
- 2B: api_version,api_key+api_version,可以决定当前消息的协议,如Producer、Fetcher、LeaderAndISR等
- 4B: correlation_id,返回给客户端时携带
- clientId: string
- 2B: string长度
- remain: string具体内容
- Body(Payload),根据不同的协议使用不同的解析方法,此处以
Produce
协议为例- 2B: acks,写入的ack值,-1、0、1等,分别表示写入的可靠性要求
- 4B: timeout,表示此Produce请求的超时时间
- topic_data[]: Array
- 4B: array长度
- topic_data
- topic_name: string
- 2B: string长度
- remain: string具体内容
- partition_data[]: Array
- 4B: array长度
- partition_data
- 4B: partition_id
- 4B: 消息体长度
- remain: MemoryRecords
- topic_name: string
KafkaApi支持各种协议,具体可参见源码,这里只大概描述Produce协议的主要内容。基于此,kafka各版本之间可以保证很好的协议兼容性。
优化
基于以上分析,我们通过分析相关监控指标,进行了相关调优工作,主要调优内容为(仅供参考):
zookeeper.session.timeout.ms=30000
num.network.threads=12
num.io.threads=16
num.replica.fetchers=3
replica.fetch.max.bytes=2097152
replica.fetch.response.max.bytes=20971520
调整后,cpu的负载明显提升,更加有效的利用机器资源,processor和handler的idle比例明显提高,isr expand/shrink出现频次和梳理也明显降低,大大提升的集群的稳定性。
结尾
本文主要介绍kafka内部的通信模型,kafka组件还有很多模块,后续会不断深入学习和理解,欢迎大家一起交流和学习。最近Pulsar也有兴起之势,后续也需要学习,真是学无止境啊~~