SocketServer.scala

class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {
    // 服务器配有多个网卡,可以配置多个ip,这样可以同时监听多个端口
    // endpoint封装了host、port和网络协议
    // 每个endpoint创建一个对应的Acceptor
    private val endpoints = config.listeners
    // 在RequestChannel.requestQueue中缓存的最大请求个数
    private val maxQueuedRequests = config.queuedMaxRequests
    // 每个endpoint的Processor线程个数
    private val numProcessorThreads = config.numNetworkThreads
    // Processor线程总数
    private val totalProcessorThreads = numProcessorThreads * endpoints.size
    // 每个ip上能创建的最大连接数
    private val maxConnectionsPerIp = config.maxConnectionsPerIp
    // 手动覆盖上面指定ip的最大连接数
    private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
    // Processor线程与Handler线程之间交换数据的队列
    // 创建totalProcessorThreads个responseQueue队列
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    // Processor线程的集合
    private val processors = new Array[Processor](totalProcessorThreads)
    // Acceptor集合
    private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
    // 控制每个ip上的连接数,底层维护一张 ip->连接数 的map
    // 存在多个Acceptor线程并发访问底层map的场景,需要synchronized同步
    private var connectionQuotas: ConnectionQuotas = _

    // 当Handler线程向某个responseQueue写入数据时,唤醒对应的Processor处理
    requestChannel.addResponseListener(id => processors(id).wakeup())
    
    // 服务端网络层启动入口
    def startup() {
      this.synchronized {

        connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
        // Socket的sendBuffer和recvBuffer大小
        val sendBufferSize = config.socketSendBufferBytes
        val recvBufferSize = config.socketReceiveBufferBytes
        var processorBeginIndex = 0

        // 遍历endPoints集合
        endpoints.values.foreach { endpoint =>
          val protocol = endpoint.protocolType
          val processorEndIndex = processorBeginIndex + numProcessorThreads

          // 每个endpoint创建numProcessThread个processor
          for (i <- processorBeginIndex until processorEndIndex)
            processors(i) = newProcessor(i, connectionQuotas, protocol)
          // 每个endpoint创建一个acceptor
          val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
            processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
          acceptors.put(endpoint, acceptor)
          // 创建acceptor线程并启动
          Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
          // 主线程阻塞等待acceptor线程启动完成
          // 主线程只负责启动的初始化工作,后面干活的还是acceptor线程
          acceptor.awaitStartup()

          processorBeginIndex = processorEndIndex
        }
      }
    }
}

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  // 服务端监听连接的channel和selector由Acceptor来创建
  private val nioSelector = NSelector.open() // 这个selector专门处理OP_ACCEPT
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)

  // 为这个Acceptor创建并启动关联的Processor线程集合
  // 启动Acceptor的同时就已经启动了Processor线程了
  this.synchronized {
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }

  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                // round robin 负载均衡分配 Processor
                currentProcessor = (currentProcessor + 1) % processors.length
            }
          }
      }
    } finally {
      shutdownComplete()
    }
  }

  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } catch {
        close(socketChannel)
    }
  }

  private[kafka] class Processor(val id: Int, time: Time, maxRequestSize: Int, requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas, connectionsMaxIdleMs: Long, protocol: SecurityProtocol,
                               channelConfigs: java.util.Map[String, _],
                               metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
    // 保存此processor处理的新建客户端连入的SocketChannel
    private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
    // 收到客户端的请求,还未发送响应的缓存
    private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
    // 就是KafkaSelector
    private val selector = new KSelector( maxRequestSize, connectionsMaxIdleMs, metrics, time,
                            "socket-server", metricTags, false,
                            ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true))

    def accept(socketChannel: SocketChannel) {
      newConnections.add(socketChannel)
      // 底层通过selector.wakeup实现,唤醒processor线程处理newConnections队列
      wakeup()
    }

    override def run() {
      startupComplete()
      while (isRunning) {
        try {
          configureNewConnections() // 处理新的客户端请求注册OP_READ
          processNewResponses() // 从队列里获取response放入待发送缓存
          poll() // channel阻塞select等待发送响应
          // selector的pollSelectionKeys会读取客户端的请求,这里把请求放入RequestChannel队列里 
          processCompletedReceives() 
          processCompletedSends()
          processDisconnected()
        } catch {}
      }
      shutdownComplete()
    }

    private def configureNewConnections() {
      while (!newConnections.isEmpty) {
        // 从newConnections队列里获取新连接的客户端channel
        val channel = newConnections.poll()
        try {
          // 生成connectionId,注册OP_READ; 创建KafkaChannel,加入到selector.channels里
          val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
          selector.register(connectionId, channel)
        } catch {
            close(channel)
        }
      }
  }

  private def processNewResponses() {
    // 获取该processor对应的responseQueue
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // 不需要发送响应,这个channel重新注册OP_READ
            selector.unmute(curr.request.connectionId)
          case RequestChannel.SendAction =>
            sendResponse(curr)
          case RequestChannel.CloseConnectionAction =>
            // 关闭连接
            close(selector, curr.request.connectionId)
        }
      } finally {
        // 上一条处理失败,继续获取下一条
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

  protected[network] def sendResponse(response: RequestChannel.Response) {
    val channel = selector.channel(response.responseSend.destination)
    // channel注册OP_WRITE,response写入对应channel的send缓存待发送,
    selector.send(response.responseSend)
    // 加入inflightResspones队列
    inflightResponses += (response.request.connectionId -> response)
  }

  private def processCompletedReceives() {
    // 遍历selector.completedReceives队列
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val channel = selector.channel(receive.source)
        // 创建KafkaChannel对应的session对象,和权限有关
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
        // 创建RequestChannel.Request请求对象
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
        // 放入RequestChannel.requestQueue请求队列里
        requestChannel.sendRequest(req)
        // 取消关注OP_READ
        selector.mute(receive.source)
      } catch {
          close(selector, receive.source)
      }
    }
  }

  private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      // 发送完response后从在途响应队列里移除
      val resp = inflightResponses.remove(send.destination)
      // 添加关注OP_READ
      selector.unmute(send.destination)
    }
  }

  private def processDisconnected() {
    selector.disconnected.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId)
      // 连接断开从在途响应队列里移除
      inflightResponses.remove(connectionId)
      // 管理连接数
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }
}

// Acceptor和Processor的父类,主要是操作启动和关闭
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
  // startup是否完成
  private val startupLatch = new CountDownLatch(1)
  // shutdown是否完成
  private val shutdownLatch = new CountDownLatch(1)
  // 线程是否存活,shutdown会置为false
  private val alive = new AtomicBoolean(true)
}

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  // 在SocketServer里添加listener,监听response队列有响应就唤醒processor,返回response给client
  private var responseListeners: List[(Int) => Unit] = Nil
  // 所有Processor把请求都放在这个队列,保证线程安全
  // queueSize:请求缓存最大个数
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
  // 每个Processor对应一个response队列
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

  // 返回response到队列里,通过listener唤醒Processor取走响应返回给clinet
  def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }
}

case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
    // 涉及到跨线程比较,所以声明volatile
    @volatile var requestDequeueTimeMs = -1L
    @volatile var apiLocalCompleteTimeMs = -1L
    @volatile var responseCompleteTimeMs = -1L
    @volatile var responseDequeueTimeMs = -1L
    @volatile var apiRemoteCompleteTimeMs = -1L

    val header: RequestHeader =
        buffer.rewind
        RequestHeader.parse(buffer)

    val body: AbstractRequest =
        if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion))
          new ApiVersionsRequest
        else
          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容