OkHttp源码(三)

简单的梳理一下五个拦截器的逻辑:

  • RetryAndFollowUpInterceptior
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectionInterceptor
  • CallServerInterceptor

RetryAndFollowUpInterceptor

这是OkHttp代码中的第一个拦截器,也就是说除了自定义的应用拦截器外第一个处理request的拦截器,从名字来猜测它主要负责请求的重试操作和重定向操作。拦截器的重点都在于其intercept()的实现,所以来看RetryAndFollowUpInterceptor的intercept()是如何实现的。

初始

Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

从参数拦截器连中拿到request,将Chain强制准换成RealInterceptorChain,获取到call对象和监听回调,这个realChain是之后调用proceed()的。

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

这里创建了一个StreamAllocation对象,在 RetryAndFollowUpInterceptor中只是做了初始工作,并没有用到它,在之后会有拦截器用到。

重试循环

一个while(true)的循环,先看下面的代码块。

    boolean releaseConnection = true;
    try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
    } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
    } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
    } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
    }

releaseConnection先设置为true,链接是释放的。try块调用了realChain的proceed(),调用了下一个拦截器,也传入了初始化过的streamAllocation对象。因为在之后的拦截器链中会有链接网络的工作,所以此时设置releaseConnection为false。接下来是对异常的捕获,捕获到异常说明在拦截器调用中的某个地方出现了问题导致,connection未释放,设置为false之后进入下一次循环。

但是在跳转到下一次循环之前还会执行finally代码块,我们来分析一下。如果在proced()中出现异常,则不会执行后一句releaseConnection = false;而在捕获异常中,如果是RouteException或IOException的话,操作之后就会赋值false,所以finally中主要是针对proceed()中出现其他异常的情况处理。

if (++followUpCount > MAX_FOLLOW_UPS) {
    streamAllocation.release();
    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

MAX_FOLLOW_UPS这个常量定为20,循环每重试一次followUpCount就会自增,所以重试是有次数限制的。

BridgeInterceptor

BridgeInterceptor主要是负责添加头部的任务,和对返回来的response进行解压的工作。核心的intercept()如下,我们逐步分析。

下面代码主要为request添加Content-Type、Content-Length或Transfer-Encoding,从这里我们也可以发现其实这些头信息是不需要我们手动添加的,BridgeInterceptor都会根据body获取这些信息自动帮我们添加,就算我们自己添加了,它也会帮我们覆盖掉。

    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

下面这段代码,如果我们没有手动添加Host、Connection和User-Agent字段,OkHttp会帮我们添加默认,也就是说不想之前的Content-Type等会帮我们覆盖,就算写错了也不影响,这些字段如果写了就不能写错。

if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
}
if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
}

默认支持gzip压缩

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
}

再来看关于cookie的部分

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
}
  /** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
  private String cookieHeader(List<Cookie> cookies) {
    StringBuilder cookieHeader = new StringBuilder();
    for (int i = 0, size = cookies.size(); i < size; i++) {
      if (i > 0) {
        cookieHeader.append("; ");
      }
      Cookie cookie = cookies.get(i);
      cookieHeader.append(cookie.name()).append('=').append(cookie.value());
    }
    return cookieHeader.toString();
  }

cookieJar一直可以追踪到client的Builder中有一个默认赋值,cookieJar = CookieJar.NO_COOKIES;,所以如果在client初始时没有传入cookieJar此时这里给cookies赋值为一个空集合,就不会默认添加Cookie的header,如果初始传入过cookieJar,这里就会将cookies中的cookie写到header中。

CookieJar NO_COOKIES = new CookieJar() {
    @Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
    }

    @Override public List<Cookie> loadForRequest(HttpUrl url) {
      return Collections.emptyList();
    }
  };

再之后就是下一个拦截器的连接了。

Response networkResponse = chain.proceed(requestBuilder.build());

在获取到response后,如果response的header中有有cookie,那么就将它们存到cookieJar。

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
  public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
    if (cookieJar == CookieJar.NO_COOKIES) return;

    List<Cookie> cookies = Cookie.parseAll(url, headers);
    if (cookies.isEmpty()) return;

    cookieJar.saveFromResponse(url, cookies);
  }

下面是对response的解压工作,将流转化成解压过直接能使用的response,然后对header进行了一些处理构建了一个response返回给上一个拦截器。

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
    return responseBuilder.build();

CacheInterceptor

名字来看就是处理缓存的拦截器了。可以看到CacheInterceptor唯一一个成员:

final InternalCache cache;

InternalCache是一个接口,注释有这样一段话:

/**
 * OkHttp's internal cache interface. Applications shouldn't implement this: instead use {@link
 * okhttp3.Cache}.
 */

点进Cache类发现里面有InternalCache的实现,而实现都是直接调用外部类的方法的。

  final InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
      return Cache.this.get(request);
    }

    @Override public CacheRequest put(Response response) throws IOException {
      return Cache.this.put(response);
    }
    ...
  };

所以我们先看看Cache类是什么。

Cache类

put()

看到这一个判断,OkHttp不缓存非GET方法的响应。

if (!requestMethod.equals("GET")) {
      // Don't cache non-GET responses. We're technically allowed to cache
      // HEAD requests and some POST requests, but the complexity of doing
      // so is high and the benefit is low.
      return null;
}

创建了一个Entry对象,看到Entry成员就差不多明白其实OkHttp就是把所有属性封装成了Entry类,方便进行缓存。

Entry entry = new Entry(response);
private static final class Entry {
    /** Synthetic response header: the local time when the request was sent. */
    private static final String SENT_MILLIS = Platform.get().getPrefix() + "-Sent-Millis";
    /** Synthetic response header: the local time when the response was received. */
    private static final String RECEIVED_MILLIS = Platform.get().getPrefix() + "-Received-Millis";
    private final String url;
    private final Headers varyHeaders;
    private final String requestMethod;
    private final Protocol protocol;
    private final int code;
    private final String message;
    private final Headers responseHeaders;
    private final @Nullable Handshake handshake;
    ...

创建了DiskLruCache的Editor对象,猜测OkHttp的缓存写入工作都是交给它来实现的。

DiskLruCache.Editor editor = null;

通过DiskLruCache的edit()方法拿到editor。调用entry的writeTo()缓存了请求和响应等一些信息,到这里,我们发现还没有缓存body的信息,那么看最后返回的CacheRequestImpl的实现,发现内部有一个editor和一个body,body的缓存和它有关系,它主要用于返回给CacheInterceptor用来更新和写入缓存的。

try {
      editor = cache.edit(key(response.request().url()));
      if (editor == null) {
        return null;
      }
      entry.writeTo(editor);
      return new CacheRequestImpl(editor);
    } catch (IOException e) {
      abortQuietly(editor);
      return null;
    }

key()的实现,实际就是将请求的url做md5加密处理再得到其十六进制的表示形式。

public static String key(HttpUrl url) {
    return ByteString.encodeUtf8(url.toString()).md5().hex();
}

writeTo()的实现,可以看到缓存的内容。
缓存的不仅仅是相应的头部信息,还包括请求的头部信息,如果是https的请求还会缓一些握手的信息。

public void writeTo(DiskLruCache.Editor editor) throws IOException {
      BufferedSink sink = Okio.buffer(editor.newSink(ENTRY_METADATA));

      sink.writeUtf8(url)
          .writeByte('\n');
      sink.writeUtf8(requestMethod)
          .writeByte('\n');
      ...
      sink.writeUtf8(new StatusLine(protocol, code, message).toString())
          .writeByte('\n');
      sink.writeDecimalLong(responseHeaders.size() + 2)
          .writeByte('\n');
      ...
      if (isHttps()) {
        sink.writeByte('\n');
        sink.writeUtf8(handshake.cipherSuite().javaName())
            .writeByte('\n');
        ...
      }
      sink.close();
    }
get()

用来从缓存中读取响应体response的。
拿到url的key值。

String key = key(request.url());

声明缓存快照对象和Entry对象。

DiskLruCache.Snapshot snapshot;
Entry entry;

通过DiskLruCache的get()拿到快照。

try {
  snapshot = cache.get(key);
  if (snapshot == null) {
    return null;
  }
} catch (IOException e) {
  // Give up because the cache cannot be read.
  return null;
}

创建Entry对象,如果创建不成功就关闭资源。

try {
  entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
  Util.closeQuietly(snapshot);
  return null;
}

调用entry的response()构建了response对象。

Response response = entry.response(snapshot);

看看response()的实现,大部分是从Entry里面拿值,都是在构造Entry的时候就拿到缓存了。这里拿到了body,builder构建了response并返回。

public Response response(DiskLruCache.Snapshot snapshot) {
  String contentType = responseHeaders.get("Content-Type");
  String contentLength = responseHeaders.get("Content-Length");
  Request cacheRequest = new Request.Builder()
      .url(url)
      .method(requestMethod, null)
      .headers(varyHeaders)
      .build();
  return new Response.Builder()
      .request(cacheRequest)
      .protocol(protocol)
      .code(code)
      .message(message)
      .headers(responseHeaders)
      .body(new CacheResponseBody(snapshot, contentType, contentLength))
      .handshake(handshake)
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(receivedResponseMillis)
      .build();
}

最后做了一个request和response匹配判断的工作,并返回了Response。

if (!entry.matches(request, response)) {
  Util.closeQuietly(response.body());
  return null;
}
return response;
public boolean matches(Request request, Response response) {
  return url.equals(request.url().toString())
      && requestMethod.equals(request.method())
      && HttpHeaders.varyMatches(response, varyHeaders, request);
}

CacheInterceptor

还是看intercept()。

先尝试获取缓存。

Response cacheCandidate = cache != null
    ? cache.get(chain.request())
    : null;

CacheStrategy缓存策略类,再看get方法。

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();

逻辑其实在getCandidate()方法中。

public CacheStrategy get() {
  CacheStrategy candidate = getCandidate();
  if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
    // We're forbidden from using the network and the cache is insufficient.
    return new CacheStrategy(null, null);
  }
  return candidate;
}

getCandidate()真的是很长,分了很多种情况来判断是使用缓存网络请求还是直接使用缓存好的response,大概看一下实现。

如果没有response的缓存,那就使用请求。

if (cacheResponse == null) {
    return new CacheStrategy(request, null);
}

如果请求是https的并且没有握手,那么重新请求。

if (request.isHttps() && cacheResponse.handshake() == null) {
    return new CacheStrategy(request, null);
}

如果response是不该被缓存的,就请求,isCacheable()内部是根据状态码判断的。

// If this response shouldn't have been stored, it should never be used
// as a response source. This check should be redundant as long as the
// persistence store is well-behaved and the rules are constant.
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}

如果请求指定不使用缓存响应,或者是可选择的,就重新请求。

CacheControl requestCaching = request.cacheControl();
  if (requestCaching.noCache() || hasConditions(request)) {
    return new CacheStrategy(request, null);
}

是否是不容易影响的,传入request=null,直接从缓存的response拿数据。

CacheControl responseCaching = cacheResponse.cacheControl();
  if (responseCaching.immutable()) {
    return new CacheStrategy(null, cacheResponse);
}

如果response有缓存,并且时间比较近,添加一些头部信息后返回request=null的策略。

if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
    Response.Builder builder = cacheResponse.newBuilder();
    if (ageMillis + minFreshMillis >= freshMillis) {
      builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
    }
    long oneDayMillis = 24 * 60 * 60 * 1000L;
    if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
      builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
    }
    return new CacheStrategy(null, builder.build());
}

回到intercept(),分情况得到strategy之后分别拿到request和response。

Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

判断了如果缓存不为空,调用了trackResponse(),和前面分析的get() put()一样,是在Cache中的。

if (cache != null) {
  cache.trackResponse(strategy);
}

每次调用这个方法requestCount都会自增,之后判断request是否为空,如果不空,requestCount自增,如果response不空,hitCount命中率加一。

  synchronized void trackResponse(CacheStrategy cacheStrategy) {
    requestCount++;

    if (cacheStrategy.networkRequest != null) {
      // If this is a conditional request, we'll increment hitCount if/when it hits.
      networkCount++;
    } else if (cacheStrategy.cacheResponse != null) {
      // This response uses the cache and not the network. That's a cache hit.
      hitCount++;
    }
  }

如果需要使用缓存,但是缓存Response为空,就关掉。

if (cacheCandidate != null && cacheResponse == null) {
  closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.}

如果策略不是从网络获取,并且也没缓存,就会构建一个504的response。

// If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

如果有缓存,策略也不是从网络获取,那么就直接返回缓存结果。

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

调用拦截器进行网络获取,调用下一个拦截器ConnectInterceptor。

Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

拦截器回来拿到网络请求的response之后,我们判断了如果响应码304的话,就直接从缓存中读取数据。

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

如果有响应体并且可缓存,那么就将响应写入缓存。

if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

如果request是无效的,就要从缓存中删除。

if (HttpMethod.invalidatesCache(networkRequest.method())) {
    try {
      cache.remove(networkRequest);
    } catch (IOException ignored) {
      // The cache cannot be written.
    }
  }

ConnectInterceptor

作用就是打开与服务器之间的连接,正式开启OkHttp的网络请求。

还是从ConnectInterceptor的intercept()来看,代码非常短。

首先从realChain中拿到了streamAllocation对象,这个对象在RetryAndFollowInterceptor中就已经初始化过了,只不过一直没有使用,到了ConnectTnterceptor才使用。

StreamAllocation streamAllocation = realChain.streamAllocation();

判断是否是GET请求,之后传入newStream()生成了一个HttpCodec对象。这个对象是用于编码request和解码response的一个封装好的对象。

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);

接着获取了一个RalConnection对象,这个对象就是用于实际的网络传输的,写入request、读取response。

RealConnection connection = streamAllocation.connection();

将创建好的httpCodec和connection对象传递给下一个拦截器。

return realChain.proceed(request, streamAllocation, httpCodec, connection);

ConnectInterceptor就是这些,我们再来看一看StreamAllocation的newStream()方法。核心代码在下面,首先使用findHealthyConnection获取到了RealConnection,接着调用了realConnection的newCodec()获取到了HttpCodec对象,最后在同步代码块中返回了httpCodec对象。

try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }

我们再来看findHealthyConnection()。看内部的循环,可以看到又封装了一层,调用findConnect()获得RealConnection。接着在同步代码块判断了realConnetion的successCount是否为零,如果是零就直接返回这个realConnection。如果不是,那就判断一下是否为healthy的,也就是是否是可用的链接,如果不是就进行回收工作并继续循环找下一个,如果是就直接返回。

while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }

来看最终的findConnectiion(),代码还是比较长的,但是看注释就差不多知道逻辑了,如果连接存在,就直接使用,如果不存在就去连接池中获取,再获取不到的话就去新建一个连接。

  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */

如果连接存在,就直接给result赋值。

if (this.connection != null) {
    // We had an already-allocated connection and it's good.
    result = this.connection;
    releasedConnection = null;
}

如果result为空,也就是没有存在的连接,就去连接池获取,如果获取到了,就给result。

if (result == null) {
    // Attempt to get a connection from the pool.        
    Internal.instance.get(connectionPool, address, this, null);
    if (connection != null) {
      foundPooledConnection = true;
      result = connection;
    } else {
      selectedRoute = route;
    }
}

如果获取到了result,那么就进行网络连接。

// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener);

连接完之后,会把这个connection放到连接池中。

Internal.instance.put(connectionPool, result);

来看一看connect()的实现,太多了,来看重点几个地方。

检查连接是否已经建立了,如果建立了就抛出异常,protocol是连接中所可能用到的协议。

if (protocol != null) throw new IllegalStateException("already connected");

ConnectionSpec是一个指定连接的socket的配置。ConnectionSpecSelector用于选择连接的,OkHttp中有两种,一种是隧道连接,一种是socket连接。

List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

在这里while循环判断是否要建立隧道连接,是就建立。

while (true) {
  try {
    if (route.requiresTunnel()) {
      connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
    ...
}

requiresTunnel()的实现:

  /**
   * Returns true if this route tunnels HTTPS through an HTTP proxy. See <a
   * href="http://www.ietf.org/rfc/rfc2817.txt">RFC 2817, Section 5.2</a>.
   */
  public boolean requiresTunnel() {
    return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
  }

连接池

keep-alive和多路复用的实现都会引入一个连接池的概念,来维护整个OkHttp的网络连接。OkHttp会把客户端与服务端之间的连接抽先给Connection,RealConnection就是其实现类,为了管理这些连接,就提供了ConnectionPool这个类,为了复用连接而设计,当可以共用相同的地址,在时间范围内就可以复用连接。还实现了哪些连接保持打开状态以备后面来使用的策略。还有有有效的清理回收工作。

get()

遍历连接集合总的所有连接,如果可用,就调用streamAllocation的aquire()。

  /**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

来看aquire(),它将connection赋值给StreamAllocation的connection,最后将这个streamAllocation的弱应引用添加到这个connection的allocations这个集合当中去,这个集合是用来通过当前连接对象所持有的弱引用数量判断这个网络连接的负载量是否超过了它的最大值。

  /**
   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
   */
  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }
put()

首先执行了一个为清理工工作,之后将connection加入到连接集合当中。这个clenupRunnable就是属于连接池清理回收的实现。

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
  • 每次http请求都会产生一个StreamAllocation对象
  • 每次都会将StreamAllocation的弱引用添加到RealConnection对象的allocations集合中。(通过这个集合的大小来判断每一个connection是否超过了其最大连接数)
  • 从连接池中复用连接

Connection 清理回收

  • Gc回收算法
  • streamAllocaton的数量渐渐变为0,被线程池检测到,回收。
  • 独立的线程cleanupRunnable清理连接池

来看cleanupRunnable,它是一段死循环

  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

来一步一步分析,第一次清理的时候会返回下一次清理的时间间隔。

long waitNanos = cleanup(System.nanoTime());

try catch中会等待,时间过了之后会继续循环清理。

ConnectionPool.this.wait(waitMillis, (int) waitNanos);

来看cleanup(),这个就是具体的gc算法,类似于标记清除算法。

在同步代码块中遍历连接池所有的连接。

for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); )

判断此连接是否有使用,如果有就继续循环判断下一个连接。

if (pruneAndGetAllocationCount(connection, now) > 0) {
  inUseConnectionCount++;
  continue;
}

看pruneAndGetAllocationCount()的实现

拿到当前connection的allocations这个allocation弱应用集合,对其每个弱引用循环遍历。

List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); )

查看这个streamallocation是否为空,如果不为空就继续遍历下一个弱应用。

Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
    i++;
    continue;
}

如果为空就应该删除这个引用。

references.remove(i);

如果都删空了,就意味着这个connection没有任何引用,就返回零,其它情况返回引用的数值。

if (references.isEmpty()) {
    connection.idleAtNanos = now - keepAliveDurationNs;
    return 0;
}

回到cleanup方法,如果pruneAndGetAllocationCount()返回0,那么进行下面的回收工作。

如果控线连接超过数量,那么就回收这个连接。

if (longestIdleDurationNs >= this.keepAliveDurationNs
      || idleConnectionCount > this.maxIdleConnections) {
    // We've found a connection to evict. Remove it from the list, then close it below (outside
    // of the synchronized block).
    connections.remove(longestIdleConnection);
}

如果都是活跃连接,那么返回时间。

else if (idleConnectionCount > 0) {
    // A connection will be ready to evict soon.
    return keepAliveDurationNs - longestIdleDurationNs;
}

如果还可以塞下连接,但是有可能有空闲连接,直接返回keepAlive时间。

else if (inUseConnectionCount > 0) {
    // All connections are in use. It'll be at least the keep alive duration 'til we run again.
    return keepAliveDurationNs;
}

如果没有任何连接,那就返回-1跳出cleanupRunnable的清理循环。

else {
    // No connections, idle or in use.
    cleanupRunning = false;
    return -1;
}
  • OkHttp使用了gc回收算法
  • 判断streamAllocation数量
  • 就可以保持多个健康的keep-alive连接。

CallServerInterceptor

如果没有添加networkInterceptor的话,ConnectInterceptor的下一步就到了CallServerInterceptor。负责向流中写入请求并读取响应的工作。
首先拿到上一个拦截器传来的httpCodec对象,这个对象是整个拦截器的核心,因为写入请求和读取响应都是使用这个httpCodec的。

HttpCodec httpCodec = realChain.httpStream();

这些对象都是之前拦截器已经初始化好的,connection是上一步已经完成连接工作的连接。

StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

使用httpCodec将请求头写入

httpCodec.writeRequestHeaders(request);

看看writeRequestHeaders()的实现。HttpCodec是一个接口,看Http1Codec中的实现。拿到请求行和请求头,传给writeRequest()。

@Override public void writeRequestHeaders(Request request) throws IOException {
  String requestLine = RequestLine.get(
      request, streamAllocation.connection().route().proxy().type());
  writeRequest(request.headers(), requestLine);
}

sink是封装了socket的输出流,涉及到Okio的知识,按照Http消息格式写入请求行和请求头。

public void writeRequest(Headers headers, String requestLine) throws IOException {
  if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
  sink.writeUtf8(requestLine).writeUtf8("\r\n");
  for (int i = 0, size = headers.size(); i < size; i++) {
    sink.writeUtf8(headers.name(i))
        .writeUtf8(": ")
        .writeUtf8(headers.value(i))
        .writeUtf8("\r\n");
  }
  sink.writeUtf8("\r\n");
  state = STATE_OPEN_REQUEST_BODY;
}

写入请求头之后,会有一个请求头的判断,如果有Expect:100-continue,就不去写入请求body了,直接开始读取响应头。100-continue用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处理POST的数据,如果不处理,客户端则不上传POST的body数据,如果处理,则POST上传数据。在现实应用中,当在POST大数据时,才会使用100-continue协议。

客户端在发送请求数据之前去判断服务器是否愿意接收该数据,如果服务器愿意接收,客户端才会真正发送数据,这么做的原因是如果客户端直接发送请求数据,但是服务器又将该请求拒绝的话,这种行为将带来很大的资源开销。所以为了避免这种情况,并不是所有的server都会正确处理并且应答”100-continue“。

if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
  httpCodec.flushRequest();
  realChain.eventListener().responseHeadersStart(realChain.call());
  responseBuilder = httpCodec.readResponseHeaders(true);
}

接着判断100-continue是否读取到响应头,如果没有读取到响应,说明服务端是想接受request body的,或者是正常没有100-continue的情况,就继续正常的步骤写入request body。

if (responseBuilder == null) {
  realChain.eventListener().requestBodyStart(realChain.call());
  long contentLength = request.body().contentLength();
  CountingSink requestBodyOut =
      new CountingSink(httpCodec.createRequestBody(request, contentLength));
  BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
  request.body().writeTo(bufferedRequestBody);
  bufferedRequestBody.close();
  realChain.eventListener()
      .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} 

else if中如果已有响应,那么再去判断一下这个连接是否为http2.0的,因为http2.0可以多路复用,这个连接可以复用。如果不是,那么这个流就已经不需要了,调用streamAllocation的noNewStreams()来关闭这个连接,防止之后在这个连接上创建新的流。

else if (!connection.isMultiplexed()) {
  streamAllocation.noNewStreams();
}
/**
 * Returns true if this is an HTTP/2 connection. Such connections can be used in multiple HTTP
 * requests simultaneously.
 */
public boolean isMultiplexed() {
  return http2Connection != null;
}

到这里,写入工作就完成了,下面就开始了读取工作。

httpCodec.finishRequest();

如果之前还没有读取到响应头,就读,是上面不经历100-continue的情况。到这一步,其实不管是经历了100-continue还是没经历,都已经入去了一次响应头,只不过readResponseHeaders的参数不同,100-continue是true,表示之后还有可能再读取一次,因为最初读到的可能不是真正响应的响应头。

if (responseBuilder == null) {
  realChain.eventListener().responseHeadersStart(realChain.call());
  responseBuilder = httpCodec.readResponseHeaders(false);
}

获取响应码,如果是100,则再读取一次响应头,完善response对象,code赋值真正响应码。

int code = response.code();
if (code == 100) {
  // server sent a 100-continue even though we did not request one.
  // try again to read the actual response
  responseBuilder = httpCodec.readResponseHeaders(false);
  response = responseBuilder
          .request(request)
          .handshake(streamAllocation.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
  code = response.code();
}

这一步判断是否为websocket,如果不是就读取响应体,继续构建response。

if (forWebSocket && code == 101) { 
  response = response.newBuilder()
      .body(Util.EMPTY_RESPONSE)
      .build();
} else {
  response = response.newBuilder()
      .body(httpCodec.openResponseBody(response))
      .build();
}

从请求头和响应头判断其中是否有表明需要保持连接打开,如果不需要就释放连接和流。

if ("close".equalsIgnoreCase(response.request().header("Connection"))
    || "close".equalsIgnoreCase(response.header("Connection"))) {
  streamAllocation.noNewStreams();
}

判断是否无内容,抛异常。

204:空内容,服务器成功执行请求,但是没有返回信息。205:重置内容,服务器成功执行了请求,但是但是没有返回内容,与204不同,他需要请求者重置文档视图(比如,清除表单内容,重新输入)。

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
  throw new ProtocolException(
      "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

最后把response返回给之前的拦截器。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342