OkHttp的ConnectInterceptor连接拦截器剖析

OkHttp的CacheInterceptor缓存拦截器剖析传送门:https://www.jianshu.com/p/3c07a12dc8ed
在RetryAndFollowUpInterceptor里初始化了一个StreamAllocation对象,这个StreamAllocation对象里初始化了一个Socket对象用来做连接,但是并没有真正的连接,等到处理完hader和缓存信息之后,才调用ConnectInterceptor来进行真正的连接。
这个类源码很少,如下:

/** Opens a connection to the target server and proceeds to the next interceptor.
 *  OkHttp当中的真正的网络请求都是通过网络连接器来实现的
 * */
public final class ConnectInterceptor implements Interceptor {
    public final OkHttpClient client;

    public ConnectInterceptor(OkHttpClient client) {
        this.client = client;
    }

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        // 建立Http网络请求所有需要的网络组件 ,在RetryAndFollowUpInterceptor创建了StreamAllocation,在这里使用
        StreamAllocation streamAllocation = realChain.streamAllocation();

        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        // 我们需要网络来满足这个要求。 可能用于验证条件GET
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        // HttpCodec用来编码Request,解码Response
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        // RealConnection用来进行实际的网络io传输的即建立连接
        RealConnection connection = streamAllocation.connection(); // 很关键的

        return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }
}

在这里分析一下我们在RetryAndFollowUpInterceptor提到的StreamAllocation对象;
StreamAllocation相当于是个管理类,维护了Connections、Streams和Calls之间的管理,该类初始化一个Socket连接对象,获取输入/输出流对象。
顺着上面连接拦截器的方法点进去看看下面这个方法:

streamAllocation.newStream

// 创建HttpCodec
    public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {

        int connectTimeout = chain.connectTimeoutMillis(); // 设置的连接超时时间
        int readTimeout = chain.readTimeoutMillis();  // 读取超时
        int writeTimeout = chain.writeTimeoutMillis(); // 写入超时
        int pingIntervalMillis = client.pingIntervalMillis(); // Web socket ping 间隔 (毫秒) 定时通知服务器,为心跳连接做准备,如果pingIntervalMillis 设置为0的时候 心跳executor是不会执行的
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();  // 连接失败是否重试

        try {
            // 生成实际的网络连接类 ,RealConnection利用Socket建立连接
            RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
                    writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
            // 通过网络连接的实际类生成网络请求和网络响应的编码类
            HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

            synchronized (connectionPool) {
                codec = resultCodec;
                return resultCodec;
            }
        } catch (IOException e) {
            throw new RouteException(e);
        }
    }

这是一个创建HttpCodec的方法,这是一个接口,有两个实现类,Http1Codec和Http2Codec,是根据协议版本去分别创建的。Http1.x和Http2.x协议。

方法findHealthyConnection():

 /**
     * 找到一个连接,如果它是健康的,则返回它.
     * 如果不正常(健康),则重复该过程,直到找到正常连接为止
     */
    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
                                                 int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
                                                 boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
            RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                    pingIntervalMillis, connectionRetryEnabled);

            // If this is a brand new connection, we can skip the extensive health checks.
            synchronized (connectionPool) {
                if (candidate.successCount == 0) { // 等于0的时候表示整个网络请求已经结束了
                    return candidate;
                }
            }

            // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
            // isn't, take it out of the pool and start again.
            // 不健康,网络链接没及时关闭,输入输出流没有及时关闭,这时候就认为不健康
            if (!candidate.isHealthy(doExtensiveHealthChecks)) { // 当这个网络连接类不健康
                noNewStreams(); // 回收网络请求资源
                continue; // 跳出这次循环,接着下一次循环
            }

            return candidate;
        }
    }

方法findConnection():

/**
     * Returns a connection to host a new stream. This prefers the existing connection if it exists,
     * then the pool, finally building a new connection.
     * 返回一个连接来托管一个新的流。 可以复用现有的连接(如果存在的话),然后是池,最后建立一个新的连接
      * 调用该方法的RealConnection.connect()方法建立连接,connect-->connectSocket()进行socket连接-->Platform.get().connectSocket()-->socket.connect(address, connectTimeout);(此时进行了三次握手),握手完成后调用establishProtocol()。
     */
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
                                          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        Connection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
            if (released) throw new IllegalStateException("released");
            if (codec != null) throw new IllegalStateException("codec != null");
            if (canceled) throw new IOException("Canceled");

            // Attempt to use an already-allocated connection. We need to be careful here because our
            // already-allocated connection may have been restricted from creating new streams.
            // 翻译上面的注释:尝试使用已分配的连接。 我们在这里需要小心,因为我们已经分配的连接可能已经被限制在创建新的流中
            releasedConnection = this.connection; // 直接复用
            toClose = releaseIfNoNewStreams();
            // 查看是否有完好的连接
            if (this.connection != null) {
                // We had an already-allocated connection and it's good.
                result = this.connection;
                releasedConnection = null;
            }
            if (!reportedAcquired) {
                // If the connection was never reported acquired, don't report it as released!
                releasedConnection = null;
            }
            // 连接池中是否用可用的连接,有则使用
            if (result == null) {
                // Attempt to get a connection from the pool. 从连接池中返回一个RealConnection
                Internal.instance.get(connectionPool, address, this, null);
                if (connection != null) {
                    foundPooledConnection = true;
                    result = connection;
                } else {
                    selectedRoute = route;
                }
            }
        }
        closeQuietly(toClose);

        if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
            // If we found an already-allocated or pooled connection, we're done.
            // 如果我们找到了已经分配或者连接的连接,我们就完成了,直接返回
            return result;
        }

        // If we need a route selection, make one. This is a blocking operation.
        // 如果我们需要路线选择,请选择一个。 这是一项阻止操作。
        // 线程的选择,多IP操作
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
        }

        // 如果没有可用连接,则自己创建一个
        synchronized (connectionPool) {
            if (canceled) throw new IOException("Canceled");

            if (newRouteSelection) {
                // Now that we have a set of IP addresses, make another attempt at getting a connection from
                // the pool. This could match due to connection coalescing.
                // 现在我们有一组IP地址,再次尝试从池中获取连接。 这可能由于连接合并而匹配
                List<Route> routes = routeSelection.getAll();
                for (int i = 0, size = routes.size(); i < size; i++) {
                    Route route = routes.get(i);
                    Internal.instance.get(connectionPool, address, this, route);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                        this.route = route;
                        break;
                    }
                }
            }

            if (!foundPooledConnection) {
                if (selectedRoute == null) {
                    selectedRoute = routeSelection.next();
                }

                // Create a connection and assign it to this allocation immediately. This makes it possible
                // for an asynchronous cancel() to interrupt the handshake we're about to do.
                // 创建一个连接并立即将其分配给该分配。 这使得异步cancel()可以中断我们即将进行的握手
                route = selectedRoute;
                refusedStreamCount = 0;
                result = new RealConnection(connectionPool, selectedRoute); // 创建连接
                acquire(result, false);
            }
        }

        // If we found a pooled connection on the 2nd time around, we're done.
        // 如果我们第二次发现一个连接池,我们就完成了
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
            return result;
        }

        // Do TCP + TLS handshakes. This is a blocking operation. 进行实际的网络连接
        // TODO 连接具体方法 开始TCP以及TLS握手操作,这是阻塞操作
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());

        // 将新创建的连接,放在连接池中
        Socket socket = null;
        synchronized (connectionPool) {
            reportedAcquired = true;

            // Pool the connection. 紧接着把这个RealConnection放入连接池中
            Internal.instance.put(connectionPool, result);

            // If another multiplexed connection to the same address was created concurrently, then
            // release this connection and acquire that one.
            // 如果同时创建了到同一地址的另一个多路复用连接,则释放此连接并获取该连接
            if (result.isMultiplexed()) {
                socket = Internal.instance.deduplicate(connectionPool, address, this);
                result = connection;
            }
        }
        closeQuietly(socket);

        eventListener.connectionAcquired(call, result);
        return result;
    }

上面方法中标记:TODO 连接具体方法 result.connect()点击进去。
RealConnection类中的方法connect(),这里注意RealConnection继承Http2Connection.Listener,说明走的是http2.x协议,效率更高。
下面这几个方法都是出自RealConnection类,介绍怎么创建隧道建立连接。

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
                        int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
                        EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
        //线路选择
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

        if (route.address().sslSocketFactory() == null) {
            if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
                throw new RouteException(new UnknownServiceException(
                        "CLEARTEXT communication not enabled for client"));
            }
            String host = route.address().url().host();
            if (!Platform.get().isCleartextTrafficPermitted(host)) {
                throw new RouteException(new UnknownServiceException(
                        "CLEARTEXT communication to " + host + " not permitted by network security policy"));
            }
        }
        //开始连接
        while (true) {
            try {
                //建立隧道连接
                if (route.requiresTunnel()) {
                    connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
                    if (rawSocket == null) {
                        // We were unable to connect the tunnel but properly closed down our resources.
                        //我们无法连接隧道,但正确关闭了我们的资源。
                        break;
                    }
                } else {
                    //建立普通连接
                    connectSocket(connectTimeout, readTimeout, call, eventListener);
                }
                // 建立协议
                //不管是建立隧道连接,还是建立普通连接,都少不了 建立协议 这一步。
                // 这一步是在建立好了TCP连接之后,而在该TCP能被拿来收发数据之前执行的。
                // 它主要为数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协议协商等
                establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
                eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
                break;
                //完成连接
            } catch (IOException e) {
                closeQuietly(socket);
                closeQuietly(rawSocket);
                socket = null;
                rawSocket = null;
                source = null;
                sink = null;
                handshake = null;
                protocol = null;
                http2Connection = null;

                eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);

                if (routeException == null) {
                    routeException = new RouteException(e);
                } else {
                    routeException.addConnectException(e);
                }

                if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
                    throw routeException;
                }
            }
        }

        if (route.requiresTunnel() && rawSocket == null) {
            ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
                    + MAX_TUNNEL_ATTEMPTS);
            throw new RouteException(exception);
        }

        if (http2Connection != null) {
            synchronized (connectionPool) {
                allocationLimit = http2Connection.maxConcurrentStreams();
            }
        }
    }

方法connectSocket()

/**
     * Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
     * 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作。
     */
    private void connectSocket(int connectTimeout, int readTimeout, Call call,
                               EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
        //根据代理类型的不同处理Socket
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
                ? address.socketFactory().createSocket()
                : new Socket(proxy);

        eventListener.connectStart(call, route.socketAddress(), proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
            //建立Socket连接
            Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
            ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
            ce.initCause(e);
            throw ce;
        }

        // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
        try {
            //获取输入/输出流 使用的Okio库
            source = Okio.buffer(Okio.source(rawSocket));
            sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
            if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
                throw new IOException(npe);
            }
        }
    }

// Platform.get().connectSocket
public class Platform {
  public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    //最终调用java的connect
    socket.connect(address, connectTimeout);
  }
}

connectTunnel()隧道链接

/**
     * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
     * proxy server can issue an auth challenge and then close the connection.
     * 是否通过代理隧道建立HTTPS连接的所有工作。 这里的问题是代理服务器可以发出一个验证质询,然后关闭连接。
     */
    private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
                               EventListener eventListener) throws IOException {
        // 构造一个 建立隧道连接 请求。
        Request tunnelRequest = createTunnelRequest();
        HttpUrl url = tunnelRequest.url();
        for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
            // 与HTTP代理服务器建立TCP连接。
            connectSocket(connectTimeout, readTimeout, call, eventListener);
            // 创建隧道。这主要是将 建立隧道连接 请求发送给HTTP代理服务器,并处理它的响应
            tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

            if (tunnelRequest == null) break; // Tunnel successfully created.

            // The proxy decided to close the connection after an auth challenge. We need to create a new
            // connection, but this time with the auth credentials.
            closeQuietly(rawSocket);
            rawSocket = null;
            sink = null;
            source = null;
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
            // 重复上面的代码,直到建立好了隧道连接,当然最多21次,      
            // MAX_TUNNEL_ATTEMPTS=21,在类中写死的常量。
        }
    }

隧道创建方法createTunnel(),返回一个Request对象:

/**
     * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
     * the proxy connection. This may need to be retried if the proxy requires authorization.
     * 要通过HTTP代理建立HTTPS连接,请发送未加密的CONNECT请求以创建代理连接。 如果代理需要授权,则可能需要重试。
     */
    private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
                                 HttpUrl url) throws IOException {
        // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
        // 在每个SSL +代理连接的第一个消息对上创建一个SSL隧道。
        String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
        while (true) {
            Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
            source.timeout().timeout(readTimeout, MILLISECONDS);
            sink.timeout().timeout(writeTimeout, MILLISECONDS);
            tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
            //sink.flush();
            tunnelConnection.finishRequest();
            Response response = tunnelConnection.readResponseHeaders(false)
                    .request(tunnelRequest)
                    .build();
            // The response body from a CONNECT should be empty, but if it is not then we should consume
            // it before proceeding.
            long contentLength = HttpHeaders.contentLength(response);
            if (contentLength == -1L) {
                contentLength = 0L;
            }
            Source body = tunnelConnection.newFixedLengthSource(contentLength);
            Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            body.close();

            switch (response.code()) {
                case HTTP_OK:
                    // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
                    // that happens, then we will have buffered bytes that are needed by the SSLSocket!
                    // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
                    // that it will almost certainly fail because the proxy has sent unexpected data.
                    if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
                        throw new IOException("TLS tunnel buffered too many bytes!");
                    }
                    return null;

                case HTTP_PROXY_AUTH:
                    tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
                    if (tunnelRequest == null)
                        throw new IOException("Failed to authenticate with proxy");

                    if ("close".equalsIgnoreCase(response.header("Connection"))) {
                        return tunnelRequest;
                    }
                    break;

                default:
                    throw new IOException(
                            "Unexpected response code for CONNECT: " + response.code());
            }
        }
    }

到这里把连接拦截器的具体过程剖析完了,当然只是把核心的代码过了一下,详细的代码太多了,有兴趣的可以自己去官网下载下来看:https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/internal/connection/ConnectInterceptor.java
下一篇把ConnectionPool连接池也说一下,这个类本身不大,但在连接这里起到很大的作用,具体我们下篇再讲。

感谢阅读,欢迎纠错

连接拦截器中用到的复用连接池ConnectionPool传送门:https://www.jianshu.com/p/522b3c7bf333

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容