KafkaController分析2-NetworkClient分析

  • NetworkClient: 顾名思义哈, 用于网络连接,消息发送的客户端封装, 源码中的注释:

A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.

  • 用在何处:
1. kafka本身实现了java版的producer和consumer,里面的网络连接,请求发送均使用NetworkClient实现;
2. KafkaController中controller与其他broker的通讯,使用NetworkClient实现;

InFlightRequests类

  • 所在文件: clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
  • 实现了request的集合, 包括正在发送的和已经发送的但还没有接收到response的request;
  • 主要成员变量: private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
    针对每个连接使用Deque<ClientRequest>数据结构来保存所有的request;Deque<ClientRequest> 是个双端队列;
  • 添加新的request, 新的reqeust总是通过addFirst放到队首
public void add(ClientRequest request) {
        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
        if (reqs == null) {
            reqs = new ArrayDeque<>();
            this.requests.put(request.request().destination(), reqs);
        }
        reqs.addFirst(request);
    }
  • 取出最早发送的request, 通过pollLast()取出
public ClientRequest completeNext(String node) {
        return requestQueue(node).pollLast();
    }
  • public boolean canSendMore(String node)决定是否可以通过NetworkClient来发送请求
    对于通过NetworkClient来发送的request, 如果之前发送的请求并没有通过底层socket实际发送完成, 是不允许发送新的request的
public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
```

# ClusterConnectionStates
* 所在文件:clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
* 记录到各个broker node的连接状态:
`private final Map<String, NodeConnectionState> nodeState`
* 对同一node的两次连接有一定的时间间隔限制, 即采用延迟连接:
`private final long reconnectBackoffMs`
* 连接状态有如下三种:
```
ConnectionState.DISCONNECTED -- 未连接
ConnectionState.DISCONNECTING -- 正在连接
ConnectionState.CONNECTED -- 已连接
```
* `canConnect`: 判断是否允许连接到node:如果从未连接过或者连接当前是断开的并且距离上次连接的间隔大于`reconnectBackoffMs`, 则允许连接;
```
public boolean canConnect(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null)
            return true;
        else
            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
    }
```

# NetworkClien类
* 所在文件: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
* 非线程安全
* 继承自 `KafkaClient`
* 使用了 `org.apache.kafka.common.network.Selector`来处理网络IO, [详情点这里 => Kafka源码分析-网络层](http://www.jianshu.com/p/8cbc7618abcb)
* 简单讲这个类用来管理一个到broker node的连接,请求发送和响应接收:
>A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
* 核心函数 `poll`
使用`selector.poll`来处理实现的socket读写事件;
```
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
```
经过`selector.poll`的调用,所有**发送完成的requet**, **接收完成的response**, **所有断开的连接**, **所有新建成功的连接**都将放到`selector`中相应的队列里;
* 处理发送完成的request
```
private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // if no response is expected then when the send is completed, return it
        for (Send send : this.selector.completedSends()) {
            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
            if (!request.expectResponse()) {
                this.inFlightRequests.completeLastSent(send.destination());
                responses.add(new ClientResponse(request, now, false, null));
            }
        }
    }
```
对于不需要回应response的请求,将从`ifFlightRequests`中删除;
* 处理接收到的response
```
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            ClientRequest req = inFlightRequests.completeNext(source);
            ResponseHeader header = ResponseHeader.parse(receive.payload());
            // Always expect the response version id to be the same as the request version id
            short apiKey = req.request().header().apiKey();
            short apiVer = req.request().header().apiVersion();
            Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
            correlate(req.request().header(), header);
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                responses.add(new ClientResponse(req, now, false, body));
        }
    }
```
如果是`metadata`的更新response,则调用`metadataUpdater.maybeHandleCompletedReceive` 处理metadata的更新;
* 处理新建的连接
```
 private void handleConnections() {
        for (String node : this.selector.connected()) {
            log.debug("Completed connection to node {}", node);
            this.connectionStates.connected(node);
        }
    }
```
* 处理所有的`handle***`函数返回的responses
```
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }
```

# NetworkClientBlockingOps
* 所在文件: core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
* 利用非阻塞的`NetworkClient`的方法, 实现了阻塞的方法;
* 阻塞直到`Client.ready`
```
def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
    client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
      if (client.isReady(node, now))
        true
      else if (client.connectionFailed(node))
        throw new IOException(s"Connection to $node failed")
      else false
    }
  }
```
* 阻塞发送request直到收到response
```
def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
    client.send(request, time.milliseconds())

    pollUntilFound(timeout) { case (responses, _) =>
      val response = responses.find { response =>
        response.request.request.header.correlationId == request.request.header.correlationId
      }
      response.foreach { r =>
        if (r.wasDisconnected) {
          val destination = request.request.destination
          throw new IOException(s"Connection to $destination was disconnected before the response was read")
        }
      }
      response
    }
  }
```

##### [Kafka源码分析-汇总](http://www.jianshu.com/p/aa274f8fe00f)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容