在上篇 OkHttp源码分析(二)整体流程 中分析了OkHttp请求的整体流程,这接下来的这篇文章中将详细分析OkHttp5个内置的拦截器
思维导图
RetryAndFollowUpInterceptor
主要做了三件事
- 创建了StreamAllocation,用于Socket管理
- 处理重定向
- 失败重连
先看源码
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
//初始化一个socket连接对象
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
1) 初始化StreamAllocation
StreamAllocation对象用于分配一个到特定的服务器地址的流,有两个实现:Http1Codec 和 Http2Codec,分别对应 HTTP/1.1 和 HTTP/2 版本的实现。这个流可能是从ConnectionPool中取得的之前没有释放的连接,也可能是重新分配的。这涉及到连接池复用及TCP建立连接、释放连接的过程。
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
- 首先从OkHttpClient中获取ConnectionPool对象(OkHttpClient构建时创建)
- 用请求的URL创建Address对象 (Address描述某一个特定的服务器地址)
创建好StreamAllocation后并未使用,而是交给后面CallServerInterceptor使用。
随后将Request交由下一个Interceptor处理并获取响应
2)出错重试机制
重试和重定向伪代码如下
while (true) {
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
} catch (Exception e) {
//判断重试
continue;
}
//判断重定向
request = followUp; //Request重新赋值
}
在获取响应过程中如果发生异常将Catch住,根据不同的异常类型执行不同的重试机制,重试机制主要在recover中完成
private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// The application layer has forbidden retries.
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again.
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// This exception is fatal.
if (!isRecoverable(e, requestSendStarted)) return false;
// No more routes to attempt.
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
- 如果用户在OkHttpClient中设置了retryOnConnectionFailure = false,表示失败请求失败时不重试(默认为true) ,那用户不让重试也没办法了;
- 请求Request不能重复发送,也不能重试;
- 四种情况不能恢复:
- 协议错误(ProtocolException)
- 中断异常(InterruptedIOException)
- SSL握手错误(SSLHandshakeException && CertificateException)
- certificate pinning错误(SSLPeerUnverifiedException)
- 没有更多线路可供选择
如判断可恢复,将跳出该循环,重新执行
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
从而完成失败重试。
3)处理重定向
RetryAndFollowUpInterceptor通过followUpRequest()从响应的信息中提取出重定向的信息,并构造新的Request
/**
* Figures out the HTTP request to make in response to receiving {@code userResponse}. This will
* either add authentication headers, follow redirects or handle a client request timeout. If a
* follow-up is either unnecessary or not applicable, this returns null.
*/
private Request followUpRequest(Response userResponse) throws IOException {
if (userResponse == null) throw new IllegalStateException();
Connection connection = streamAllocation.connection();
Route route = connection != null
? connection.route()
: null;
int responseCode = userResponse.code();
final String method = userResponse.request().method();
switch (responseCode) {
case HTTP_PROXY_AUTH:
Proxy selectedProxy = route != null
? route.proxy()
: client.proxy();
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
return client.proxyAuthenticator().authenticate(route, userResponse);
case HTTP_UNAUTHORIZED:
return client.authenticator().authenticate(route, userResponse);
case HTTP_PERM_REDIRECT:
case HTTP_TEMP_REDIRECT:
// "If the 307 or 308 status code is received in response to a request other than GET
// or HEAD, the user agent MUST NOT automatically redirect the request"
if (!method.equals("GET") && !method.equals("HEAD")) {
return null;
}
// fall-through
case HTTP_MULT_CHOICE:
case HTTP_MOVED_PERM:
case HTTP_MOVED_TEMP:
case HTTP_SEE_OTHER:
// Does the client allow redirects?
if (!client.followRedirects()) return null;
String location = userResponse.header("Location");
if (location == null) return null;
HttpUrl url = userResponse.request().url().resolve(location);
// Don't follow redirects to unsupported protocols.
if (url == null) return null;
// If configured, don't follow redirects between SSL and non-SSL.
boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
if (!sameScheme && !client.followSslRedirects()) return null;
// Most redirects don't include a request body.
Request.Builder requestBuilder = userResponse.request().newBuilder();
if (HttpMethod.permitsRequestBody(method)) {
final boolean maintainBody = HttpMethod.redirectsWithBody(method);
if (HttpMethod.redirectsToGet(method)) {
requestBuilder.method("GET", null);
} else {
RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
requestBuilder.method(method, requestBody);
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding");
requestBuilder.removeHeader("Content-Length");
requestBuilder.removeHeader("Content-Type");
}
}
// When redirecting across hosts, drop all authentication headers. This
// is potentially annoying to the application layer since they have no
// way to retain them.
if (!sameConnection(userResponse, url)) {
requestBuilder.removeHeader("Authorization");
}
return requestBuilder.url(url).build();
case HTTP_CLIENT_TIMEOUT:
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
return null;
}
return userResponse.request();
default:
return null;
}
}
- 根据followUpRequest的返回值,如果不是重定向,就返回response
- 重定向的最大次数为20,超过20抛出异常
if (++followUpCount > MAX_FOLLOW_UPS) {//有最大次数限制20次
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
request = followUp;//把重定向的请求赋值给request,以便再次进入循环执行
priorResponse = response;
BridgeInterceptor
BridgeInterceptor紧随RetryAndFollowUpInterceptor,主要的职责如下:
- 在请求阶段补全HTTP Header;
- 响应阶段保存Cookie
- 响应阶段处理Gzip解压缩;
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
1) 补全HTTP Header
包括Content-Type、Content-Length、Transfer-Encoding、Host、Connection、Accept-Encoding、User-Agent、Cookie等
其中Cookie的加载由CookieJar提供,CookieJar可用OkHttpClient在初始化设置
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.cookieJar(new CookieJar() {
@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
// 可将cookie保存到SharedPreferences中
}
@Override
public List<Cookie> loadForRequest(HttpUrl url) {
// 从保存位置读取,注意此处不能为空,否则会导致空指针
return new ArrayList<>();
}
})
.build();
2)保存Cookie
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
if (cookieJar == CookieJar.NO_COOKIES) return;
List<Cookie> cookies = Cookie.parseAll(url, headers);
if (cookies.isEmpty()) return;
//保存到SP中
cookieJar.saveFromResponse(url, cookies);
}
3)处理Gzip
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
gzip由okio完成,随后将Content-Encoding、Content-Length从Header中移除
CacheInterceptor
Okhttp的网络缓存是基于http协议 可参考 HTTP 协议缓存机制详解
使用OkHttp缓存的前提是需要在构建OkHttpClient时指定一个Cache
OkHttpClient httpClient = new OkHttpClient.Builder()
.cache(new Cache(this.getCacheDir(), 10240 * 1024))
.build();
拦截器整体代码如下:
@Override
public Response intercept(Chain chain) throws IOException {
//读取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//CacheStrategy类似一个mapping操作,将request和cacheCandidate输入,得到两个输出networkRequest和cacheResponse
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
//缓存存在,进行缓存监控(命中次数)
if (cache != null) {
cache.trackResponse(strategy);
}
//缓存存在,经过CacheStrategy输出的缓存无效,关闭原始缓存
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
//only-if-cached(表明不进行网络请求,且缓存不存在或者过期,一定会返回504错误)
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
//不需要网络请求,返回缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//执行网络请求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
//本地缓存有效,服务器资源未修改,需要更新Header
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//使用网络响应
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//更新缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
1)读取缓存
Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
return null;
}
try {
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
Response response = entry.response(snapshot);
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}
return response;
}
- url作为输入,md5\hex加密后得到key;
- 根据key得到Snapshot,关联起文件系统中的缓存文件;
- 根据snapshot生成Entry,根据Entry生成Response返回
2)缓存策略配置
缓存策略通过CacheStrategy来实现,CacheStrategy构建分为两步
① Factory解析Header参数
public Factory(long nowMillis, Request request, Response cacheResponse) {
this.nowMillis = nowMillis;
this.request = request;
this.cacheResponse = cacheResponse;
if (cacheResponse != null) {
this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
Headers headers = cacheResponse.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String fieldName = headers.name(i);
String value = headers.value(i);
if ("Date".equalsIgnoreCase(fieldName)) {
servedDate = HttpDate.parse(value);
servedDateString = value;
} else if ("Expires".equalsIgnoreCase(fieldName)) {
expires = HttpDate.parse(value);
} else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
lastModified = HttpDate.parse(value);
lastModifiedString = value;
} else if ("ETag".equalsIgnoreCase(fieldName)) {
etag = value;
} else if ("Age".equalsIgnoreCase(fieldName)) {
ageSeconds = HttpHeaders.parseSeconds(value, -1);
}
}
}
}
- Factory中主要解析缓存中与响应有关的头,Date、Expires、Last-Modified、ETag、Age等;
- 注意Headers并不是一个Map,而是一个数组,奇数位存key,偶数位存value;
② get返回CacheStrategy实例
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
// We're forbidden from using the network and the cache is insufficient.
return new CacheStrategy(null, null);
}
return candidate;
}
get内部主要是getCandidate实现
private CacheStrategy getCandidate() {
//如果缓存没有命中(即null),网络请求也不需要加缓存Header了
if (cacheResponse == null) {
//`没有缓存的网络请求,查上文的表可知是直接访问
return new CacheStrategy(request, null);
}
// 如果缓存的TLS握手信息丢失,返回进行直接连接
if (request.isHttps() && cacheResponse.handshake() == null) {
//直接访问
return new CacheStrategy(request, null);
}
//检测response的状态码,Expired时间,是否有no-cache标签
if (!isCacheable(cacheResponse, request)) {
//直接访问
return new CacheStrategy(request, null);
}
CacheControl requestCaching = request.cacheControl();
//如果请求报文使用了`no-cache`标签(这个只可能是开发者故意添加的)
//或者有ETag/Since标签(也就是条件GET请求)
if (requestCaching.noCache() || hasConditions(request)) {
//直接连接,把缓存判断交给服务器
return new CacheStrategy(request, null);
}
//根据RFC协议计算
//计算当前age的时间戳
//now - sent + age (s)
long ageMillis = cacheResponseAge();
//大部分情况服务器设置为max-age
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
//大部分情况下是取max-age
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
//大部分情况下设置是0
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
//ParseHeader中的缓存控制信息
CacheControl responseCaching = cacheResponse.cacheControl();
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
//设置最大过期时间,一般设置为0
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
//缓存在过期时间内,可以使用
//大部分情况下是进行如下判断
//now - sent + age + 0 < max-age + 0
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
//返回上次的缓存
Response.Builder builder = cacheResponse.newBuilder();
return new CacheStrategy(null, builder.build());
}
//缓存失效, 如果有etag等信息
//进行发送`conditional`请求,交给服务器处理
Request.Builder conditionalRequestBuilder = request.newBuilder();
if (etag != null) {
conditionalRequestBuilder.header("If-None-Match", etag);
} else if (lastModified != null) {
conditionalRequestBuilder.header("If-Modified-Since", lastModifiedString);
} else if (servedDate != null) {
conditionalRequestBuilder.header("If-Modified-Since", servedDateString);
}
//下面请求实质还说网络请求
Request conditionalRequest = conditionalRequestBuilder.build();
return hasConditions(conditionalRequest) ? new CacheStrategy(conditionalRequest,
cacheResponse) : new CacheStrategy(conditionalRequest, null);
}
3)缓存监控
if (cache != null) {
cache.trackResponse(strategy);
}
synchronized void trackResponse(CacheStrategy cacheStrategy) {
requestCount++;
if (cacheStrategy.networkRequest != null) {
// If this is a conditional request, we'll increment hitCount if/when it hits.
networkCount++;
} else if (cacheStrategy.cacheResponse != null) {
// This response uses the cache and not the network. That's a cache hit.
hitCount++;
}
}
可见,缓存监控主要是监控请求次数,细分为网络请求次数和缓存命中次数。
ConnectInterceptor
ConnectInterceptor用来与服务器建立连接
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
ConnectInterceptor代码很简洁,逻辑处理交由其他类去实现了。
主要做了以下几件事情:
- 获取到StreamAllocation对象;
- 通过StreamAllocation对象创建RealConnection;
- 通过StreamAllocation对象创建HttpCodec;
RealInterceptorChain中的四个重要属性将在ConnectInterceptor中全部创建完毕
- Request
- StreamAllocation
- HttpCodec
- Connection
其中,Request一开始就有,StreamAllocation在RetryAndFollowUpInterceptor创建,因此ConnectInterceptor中主要分析Connection和HttpCodec的创建过程
Connection和HttpCodec创建过程
HttpCodec用来编解码HTTP请求和响应,通过streamAllocation.newStream方法可以创建一个HttpCodec和RealConnection
//StreamAllocation
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
HttpCodec的创建分为两步:
- 获取连接RealConnection(能复用就复用,不能复用就新建)
- 根据RealConnection创建HttpCodec
① 获取RealConnection
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// 使用已存在的连接
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// 从缓存中获取
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 线路的选择,多ip的支持
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible for
// an asynchronous cancel() to interrupt the handshake we're about to do.
// 以上都不符合,创建一个连接(RealConnection)
RealConnection result;
synchronized (connectionPool) {
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
if (canceled) throw new IOException("Canceled");
}
//Socket连接
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
// 更新缓存
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
读取缓存
Internal.instance.get(connectionPool, address, this, null);
Internal.instance在OkHttpClient静态代码块创建
@Override
public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
//ConnectionPool
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
遍历所有的Connection,Address或Route匹配则返回
建立Socket连接
public void connect(
int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
//建立连接
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
//正常走这条逻辑
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
//异常处理省略
}
}
//......
}
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
//获得代理
Proxy proxy = route.proxy();
Address address = route.address();
//根据代理类型创建Socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
//设置超时时间
rawSocket.setSoTimeout(readTimeout);
try {
//建立Socket连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
//okio读取输入流和输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
- Sink可看做OutputStream,Source可看做InputStream;
更新缓存
Internal.instance.put(connectionPool, result);
//ConnectionPool
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
cleanup的逻辑后面分析
② 根据RealConnection创建HttpCodec
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
//正常走下面
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
// source和sink是Socket连接后返回
return new Http1Codec(client, streamAllocation, source, sink);
}
}
CallServerInterceptor
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//取出在前面拦截器中创建的四个对象,他们保存在RealInterceptorChain中
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//写入请求头
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
//写入请求体
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
//读取响应头
responseBuilder = httpCodec.readResponseHeaders(false);
}
//构建响应
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//读取响应体
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
在前面ConnectInterceptor中建立Socket连接后,okio会解析输入输出流,保存在source和sink中,此时只是建立了Socket连接,并未进行数据传输,CallServerInterceptor的作用就是根据HTTP协议标准,对Request发送以及对Response进行解析。
在CallServerInterceptor中,首先会从RealInterceptorChain中取出在前面拦截器中创建的四个对象HttpCodec、StreamAllocation、RealConnection、Request。
过程分析如下:
1)发送HTTP请求数据(Header&Body)
首先在sink中写入请求头
httpCodec.writeRequestHeaders(request);
//Http1Codec
@Override
public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
- 读取请求行,这里返回 GET /api/data/Android/10/1 HTTP/1.1
- 从Request中获取Header,循环写入到sink中
其次,如果http请求有body(POST请求),再将body写入sink,发送给server
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}
最后,把sink中的数据刷出去
httpCodec.finishRequest();
2)读取响应数据
分为两个步骤
① 读取响应头
//CallServerInterceptor
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
//Http1Codec
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
- 首先解析响应行、协议、状态吗、响应头
- 利用responseBuilder构建Response
② 读取响应体
只要不是websocket并且状态码为101(服务器转换协议:服务器将遵从客户的请求转换到另外一种协议)都会读取响应体
//CallServerInterceptor
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
//Http1Codec
@Override
public ResponseBody openResponseBody(Response response) throws IOException {
Source source = getTransferStream(response);
return new RealResponseBody(response.headers(), Okio.buffer(source));
}
private Source getTransferStream(Response response) throws IOException {
if (!HttpHeaders.hasBody(response)) {
return newFixedLengthSource(0);
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
return newChunkedSource(response.request().url());
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
return newFixedLengthSource(contentLength);
}
// Wrap the input stream from the connection (rather than just returning
// "socketIn" directly here), so that we can control its use after the
// reference escapes.
return newUnknownLengthSource();
}
然后将ResponseBody更新到Response中的body中。
至此,整个请求过程执行完毕
总结
每个拦截器各司其职,环环相扣,非常优雅地完成了网络请求的流程。最后借piasy一张图,希望读者对OkHttp能有一个更加清晰的认知。
参考
https://blog.piasy.com/2016/07/11/Understand-OkHttp/
http://lowett.com/2017/02/24/okhttp-4/
http://lowett.com/2017/03/02/okhttp-5/
http://lowett.com/2017/03/09/okhttp-6/
http://lowett.com/2017/03/21/okhttp-7/
http://lowett.com/2017/03/30/okhttp-8/