31 )服务端代码设计-观察-kafka源码的包 31} 0:7
重点代码包:
broker:
D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\controller
消费者:
D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\coordinator
工具类:
D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\network
D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\tools
32 ) 服务端代码设计-acceptor线程是如何启动的 32} 0:23
D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\Kafka.scala
——》main()
//启动服务的时候 会传递一些参数,这个地方应该就是去解析一些参数。
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//TODO 核心代码
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
——》KafkaServerStartable.startup()
//TODO 启动服务
server.startup()
——》KafkaServer.startup()
//整个Kafka 服务端的功能 都是在这个里面。
//NIO的服务端
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
——》 SocketServer.startup() ↓▲
▼
// 接收和发送 请求的时候一些缓存的大小
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
//当前broker主机的id
val brokerId = config.brokerId
//核心的线程
//在Acceptor类的主构造函数里面,启动了3个Processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
// Utils是一个工具类。里面有newThread这样一个方法
//这个方法的作用就是用来帮我们启动线程使用的。
//我们阅读代码的时候要知道,如果一个线程被执行start方法
//那我们接下来就是要去看他的run方法
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
acceptor.awaitStartup()
——》SocketServer.Acceptor.构造函数()
——》SocketServer.Acceptor.openServerSocket()
//如果大家看不懂这儿的这几句代码
//一定要下去补一下javaNIO方面的知识。
val serverChannel = ServerSocketChannel.open()
——》SocketServer.Acceptor.run()
▼
-
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)服务端启动,客户端(生产者)发送过来请求,服务端对请求进行处理,服务端给客户端发送响应。 * 客户端接受到响应以后 -》 下一个请求的发送 */
//服务一直就在不断的循环
while (isRunning) {
//selecotr 查看是否有 事件注册上来。
val ready = nioSelector.select(500)
//如果是客户端发送过来 要进行网络连接的请求。
if (key.isAcceptable)
//到这个方法里面去处理
accept(key, processors(currentProcessor))
33 ) 服务端代码设计-processor线程是如何启动的? 33 1 } 0:13
入口:
——》SocketServer.Acceptor.run()
//processors(线程一,线程二,线程三)
//我们发端Acceptor线程启动起来以后
//如果有请求发送过来,会把这些请求轮询的发送给不同的
//Processor线程去处理。
//processors(0) = 第一个线程处理
//processors(1) = 第二个线程处理
//prodcessrs(2) = 第三个线程处理
accept(key, processors(currentProcessor))
33 ) 服务端代码设计-processor线程是如何启动的? 33 2 } 0:7
入口:
——》SocketServer.Acceptor.run()
▼
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
//如果是客户端发送过来 要进行网络连接的请求。
if (key.isAcceptable)
//到这个方法里面去处理
//processors(线程一,线程二,线程三)
//我们发端Acceptor线程启动起来以后
//如果有请求发送过来,会把这些请求轮询的发送给不同的
//Processor线程去处理。
//processors(0) = 第一个线程处理
//processors(1) = 第二个线程处理
//prodcessrs(2) = 第三个线程处理
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
34 ) 服务端代码设计-processor线程是如何启动的? 33 3 } 0:12
入口:
——》SocketServer.Acceptor.run()
accept(key, processors(currentProcessor))
——》SocketServer.accept()
//TODO processor调用accept方法对socketChannel进行处理
processor.accept(socketChannel)
——》Processor.accept()
//把获取到SocketChannel存入到了自己的队列
newConnections.add(socketChannel)
0:5:18
35) 服务端代码设计-网络-processor是如何接收请求的(1) -34}0:12
接上回:
——》SocketServer.Processor.run()
// setup any new connections that have been queued up
//读取每个SocketChannel,把每个SocketChannel
//都往Selector上面注册OP_READ事件。
configureNewConnections()
——》SocketServer.configureNewConnections
//TODO 注册OP_READ事件
selector.register(connectionId, channel)
——》 Selector.register()
//往自己的Selector上面注册OP_READ事件
//这样的话,Processor线程就可以 读取客户端发送过啦id连接。
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
//kafka里面对SocketChannel自己进行了封装
//封装了一个KakaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//key和channel
key.attach(channel);
//所以我们服务端这儿代码 跟我们客户端的网络部分的代码是复用
//channels里面维护了多个网络连接。
this.channels.put(id, channel);
36) 服务端代码设计-网络-processor是如何接收请求的(2) -34}0:7
回到上回的:
——0:》SocketServer.Processor.run()
// register any new responses for writing
//TODO 看起来像是处理响应的。绑定 OP_WRITE
processNewResponses()
//我们大胆的猜测,根据我们之前的了解
//读取和发送请求的代码应该都是在这个方法里面完成的。
//TODO 再次进去
poll()
——》SocketServer.Processor.poll()
——1:》Selector.poll()
//从Selector上找到有多少个key注册了
int readyKeys = select(timeout);
//立马就要对这个Selector上面的key要进行处理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys,
——1: 》Selector.pollSelectionKeys()
//根据key找到对应的KafkaChannel
KafkaChannel channel = channel(key);
//去最后完成网络的连接
//如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你
//完成网络的连接。
if (channel.finishConnect()) {
//网络连接已经完成了以后,就把这个channel存储到
this.connected.add(channel.id());
//里面不断的读取数据,读取数据的代码我们之前就已经分析过
//里面还涉及到粘包和拆包的一些问题。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
——》Selector.addToStagedReceives()
//channel代表的就是一个网络的连接,一台kafka的主机就对应了一个channel连接。
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
//往队列里面存放接受到响应
deque.add(receive);
▲ 回退到
——1: 》Selector.poll()
//TODO 对stagedReceives里面的数据要进行处理
addToCompletedReceives();
37) 服务端代码设计-网络-processor线程是如何处理stagedreceives里的请求 -35}0:10"
接上回:
—— 》Selector.addToCompletedReceives()方法
//获取到每个连接对应的 请求队列
Deque<NetworkReceive> deque = entry.getValue();
//获取到响应
//对于我们服务端来说,这儿接收到的是请求
NetworkReceive networkReceive = deque.poll();
//把响应存入到completedReceives 数据结构里面
this.completedReceives.add(networkReceive);
▲ 回退到
——0: 》SocketServer.Processor.run()
//TODO 用来处理接收到当的请求
processCompletedReceives()
——》SocketServer.Processor.processCompletedReceives()
//遍历每一个请求 Scala,函数式编程
selector.completedReceives.asScala.foreach { receive =>
//对于获取到的请求按照协议进行解析,解析出来就是一个一个Request
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session
//TODO 把request请求放入队列
requestChannel.sendRequest(req)
//TODO 取消OP_READ事件
selector.mute(receive.source)
综上几节: 请求处理示意图
38) 服务端代码设计-网络-requestqueue里的请求是如何被处理的 -36}0:10"
入口
——》KafkaServer类startup方法()
//TODO 就是它去处理的队列里面的请求的
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
——》KafkaRequestHandlerPool 构造函数
▼
val threads = new ArrayThread
val runnables = new ArrayKafkaRequestHandler
//默认启动8个线程,一般情况下,生产环境里面我们是要去设置这个参数。
for(i <- 0 until numThreads) {
//创建线程
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
//线程启动起来了
threads(i).start()
——》KafkaRequestHandler.run()
//TODO 获取request对象
req = requestChannel.receiveRequest(300)
//TODO 交给KafkaApis进行最终的处理
apis.handle(req)
——》KafkaApis.handle()
39) 服务端代码设计-网络-request是如何被处理的 -37}0:23
接上回:
入口
——》 KafkaApis类 handle ()
/因为我们用的是场景驱动的方式去分析的源码
//我们之前一直分析的是 生产者那儿会发送过来请求
//TODO 处理生产者发送过来的请求
case ApiKeys.PRODUCE => handleProducerRequest(request)
——》 KafkaApis类 handleProducerRequest()
//按照分区的方式去遍历数据。
val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
produceRequest.partitionRecords.asScala.partition {
//对方发送过来的数据进行一些判断
//主要就是针对权限等等之类的事进行判断。
case (topicPartition, _) => authorize(request.session, Describe, new
//acks = 0
//意味着生产者那儿不关心数据处理的结果。
//所以我们不需要返回响应。
if (produceRequest.acks == 0) {......
} else {
requestChannel.noOperation(request.processor, request)
}
} else {
//如果代码要走这儿,说明我们把数据处理完了以后
//需要给客户端(生产者)返回响应
//封装一个请求头
val respHeader = new ResponseHeader(request.header.correlationId)
//封装一个请求体(响应消息)
val respBody = request.header.apiVersion match {
//TODO 返回响应最重要的代码是在这儿
//这儿给我们封装了一个Response的对象
//这个对象就是服务端发送回给客户端(生产者)的
requestChannel.sendResponse(new RequestChannel.Response(request,
——》RequestChannel。sendResponse()
//把响应存入到了一个队列里面。
//先从数组里面取出对应Processor一个队列,然后把这个响应放入到这个队列里面。
responseQueues(response.processor).put(response)
40) 服务端代码设计-网络-服务端给客户端发送响应做哪些准备工作(1) -38}0:12
接上回:
▲ 回退到
——0: 》SocketServer.Processor.run()
//TODO 看起来像是处理响应的。绑定 OP_WRITE
processNewResponses()
—— 》 SocketServer.processNewResponses()
selector.unmute(curr.request.connectionId)
//如果我们是想发送请求,那么代码应该走的是这个分支。
case RequestChannel.SendAction =>
//TODO 发送请求
sendResponse(curr)
——》SocketServer.Processor. sendResponse(curr)
//正常情况走的是这个分支
selector.send(response.responseSend)
41) 服务端代码设计-网络-服务端给客户端发送响应做哪些准备工作(2) -38}0:7
接上回:
再次回到
——0: 》SocketServer.Processor.run()
//TODO 看起来像是处理响应的。绑定 OP_WRITE
processNewResponses()
—— 》 SocketServer.processNewResponses()
selector.unmute(curr.request.connectionId)
//如果我们是想发送请求,那么代码应该走的是这个分支。
case RequestChannel.SendAction =>
//TODO 发送请求
sendResponse(curr)
——》SocketServer.Processor. sendResponse(curr)
//正常情况走的是这个else分支
selector.send(response.responseSend)
——》selector.send()
//TODO
channel.setSend(send);
——》 KafakChannel.setSend( )
//往KafkaChannel里面绑定一个发送出去的请求。
this.send = send;
//关键的代码来了
//这儿绑定了一个OP_WRITE事件。
//一旦绑定了这个事件以后,我们就可以往服务端发送请求了。
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
42) 服务端代码设计-网络-响应消息是如何发送给客户端的 -39}0:7
接上回:
再次回到
——0: 》SocketServer.Processor.run()
//TODO 再次进去
poll()
——》SocketServer.Processor.poll()
——》Selector .poll()
——》Selector .pollSelectionKeys()
//已经完成响应消息的发送
if (send != null) {
this.completedSends.add(send);
// this.completedSends. 是Selector 类的成员变量,代表已经完成发送的请求
▲ 回退到 ——0: 》SocketServer.Processor.run()
//todo 处理我们已经发送出去的响应
processCompletedSends()
—— 》SocketServer. processCompletedSends()
selector.unmute(send.destination)
—— 》Selector.unmute()
——》KafkaChannel..unmute()
//重新监听 OP_READ
transportLayer.addInterestOps(SelectionKey.OP_READ);
43) 服务端代码设计-网络-支持超过并发的kafka网络设计 -40}0:13
三层网络架构,多层缓冲