一 前言
Retrofit + Okhttp + RxJava,可以说是现在最火的网络请求组合了,而它们背后的设计模式或设计思想,是它们成功的重大原因之一。
分析源码,也是为了学习它们的设计思想,提升自己的编码能力。在上篇分析Retrofit的文章中,外观模式,代理模式,装饰器模式,工厂模式,策略模式等,都是在源码解析中加强了认识。
总体分四步:
- 创建OkHttpClient客户端对象
- 创建请求消息Request对象。
- 创建网络请求的调用封装对象RealCall
- 执行网络请求,会获取响应消息Response对象给回调方法
二 OkHttpClient
用建造者模式创建对象。和Retrofit一样,主要功能生成默认配置,且属性很多,这时用建造者模式选择需要配置的属性创建对象就方便了许多。
public OkHttpClient() {
this(new Builder());
}
public Builder() {
dispatcher = new Dispatcher(); //执行网络请求的任务调度器
protocols = DEFAULT_PROTOCOLS; //默认的协议 http2 http1.1
connectionSpecs = DEFAULT_CONNECTION_SPECS; // 设置连接时支持的tls层协议以及不进行数据加密
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault(); // socket生产工厂
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool(); //连接池 支持多路复用
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true; //是否重定向
connectTimeout = 10_000; //连接超时时间
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
无参构造器是用Builder为参数创建的,相当于还是用Builder实现对象创建。参数是Builder构造器默认创建的。
它是在配置全局发送请求中所需要的各种定制化的参数,并且持有各个参数引用对象。
Dispatcher主要用来管理网络请求的线程池。
可以看出每个OkHttpClient对象对线程池和连接池都有管理,所有OkHttpClient最好做成用单例模式创建。创建多个OkHttpClient对象会占用更多内存。
2.1 Dispatcher
我看网上有说法是使用享元模式,享元共厂内部用线程池实现享元池。用于减少创建对象的数量,以减少内存占用和提高性能。但这里只有一种类型,没有分组对象。
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
//请求网络的线程池,懒汉式单例
private @Nullable ExecutorService executorService;
//准备异步队列,当运行队列满时存储任务
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//运行的异步队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//运行的同步队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//单例获取线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
//异步进行网络请求,AsyncCall是一个Runnable
synchronized void enqueue(AsyncCall call) {
//运行时异步队列未超过maxRequests且相同host的请求数量不超过maxRequestsPerHost,加入运行时队列并执行它
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//加入等待队列
readyAsyncCalls.add(call);
}
}
//同步网络请求,直接加入同步队列
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
//网络请求结束后,会将请求结束的Call从运行队列中移除
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//如果是异步队列,还要再根据运行时异步队列和同host的请求数量操控运行时异步队列和等待队列
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
//从准备队列把Call已到运行队列来
private void promoteCalls() {
//运行队列已满,则结束
if (runningAsyncCalls.size() >= maxRequests) return; // 准备队列为空,怎结束
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//如果这个Call的同host请求在运行队列中不超过maxRequestsPerHost,则加入运行队列
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//如果运行队列已满,则退出
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}
对于异步请求,调用enqueue方法,将要执行的AsyncCall放入运行或等待队列并交予线程池执行,执行完成后,再调用finished来移除请求。
对于同步请求,调用executed方法,将RealCall放入运行同步队列,在执行完成后调用finished将Call移除队列。
当为异步任务时,会调用promoteCalls方法。遍历准备队列,如果运行队列未满,且运行队列中同一主机请求不超过5个,则将其从准备队列移入运行队列;否则不移动。
三 Request
这个对象是为了配置http请求的请求消息Request,而请求消息分为四部分:请求行(request line)、请求头部(header)、空行和请求数据。
从请求头的组成就可知Request有哪些配置。
Request request = new Request.Builder()
.url(baseUrl + "top250")
.build();
public static class Builder {
HttpUrl url; //请求的url
String method; //请求方法
Headers.Builder headers; //请求头
RequestBody body; //请求数据
Object tag;
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
}
很明显,就是HTTP请求消息Request的组成
四 RealCall
okhttp3.Call call = client.newCall(request);
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
this.eventListener = eventListenerFactory.create(this);
}
创建的Call对象实际是RealCall实现的,RealCall创建就是初始化了几个属性,OkHttpClient,Request,retryAndFollowUpInterceptor。
retryAndFollowUpInterceptor是重定向拦截器,和先前的任务调度器Dispatcher一样,是实现网络请求的关键之一。
五 call.enqueue
前面都是初始化和配置设置工作,终于到实现网络请求的步骤了,enqueue是上面介绍的RealCall实现的。
@Override public void enqueue(Callback responseCallback) {
//只能执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//使用dispatcher用线程池执行网络请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//获取返回值
Response response = getResponseWithInterceptorChain();
//对返回值进行回调
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
call.enqueue中,调用了OkHttpClient的Dispatcher的enqueue,将AsyncCall将于线程池执行,线程真正执行的内容在AsyncCall的execute中。
通过getResponseWithInterceptorChain获取到网络请求的返回值,那实现网络请求的重点就在getResponseWithInterceptorChain中了。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);//在网络请求失败后进行重试
interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器,对头部信息进行一系列设置
interceptors.add(new CacheInterceptor(client.internalCache()));//缓存拦截器
interceptors.add(new ConnectInterceptor(client));//连接拦截器
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
getResponseWithInterceptorChain里添加了一系列的拦截器,再创建RealInterceptorChain开始通过进行责任链模式实现网络请求。
而责任链的顺序:
- OkHttpClient的addInterceptor的拦截器
- BridgeInterceptor
- CacheInterceptor
- ConnectInterceptor
- OkHttpClient的addNetworkInterceptor的拦截器
- CallServerInterceptor
5.1 Interceptor
5.1.1 RealInterceptorChain
那从责任链模式模式的角度先来分析它的是如何实现的。这种模式为请求创建了一个接收者对象的链,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。
public final class RealInterceptorChain implements Interceptor.Chain {
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
// 1.创建下一阶段的链对象
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
//2.获取这一阶段链的拦截者
Interceptor interceptor = interceptors.get(index);
//3.让这一阶段的拦截者处理请求
Response response = interceptor.intercept(next);
// 确保下一阶段的拦截者next,执行了chain.proceed()
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// 确保拦截器请求结果不为空
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
}
在第一次调用chain.proceed(originalRequest)开启责任链处理请求时,index还是0,
- 在注释中第一步,创建下一阶段的拦截者,index增加了1;
- 第二步,根据index从拦截器队列中获取下一阶段的拦截者;
- 第三步,用这一阶段的拦截者,执行拦截获得请求结果
RealInterceptorChain与Interceptor关系就很明了了:RealInterceptorChain的proceed方法中,会创建链的下一阶段RealInterceptorChain对象,做为Interceptor的intercept方法参数,在intercept中会执行chain.proceed()进而到链的下一环。
简单来讲,proceed方法就是RealInterceptorChain根据index获取响应登记的Interceptor,调用intercept方法时又会调用传入的chain.proceed。
在proceed方法中,有四个参数:Request,StreamAllocation,HttpCodec,RealConnection。但在RealCall的getResponseWithInterceptorChain中
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
只穿入了Request对象,其它对象则是在后续的拦截者中创建的。下面就根据这几个参数来解析这些的拦截者。
5.1.2 RetryAndFollowUpInterceptor
顾名思义,重试与重定向拦截器。也就是在网络请求失败的情况下,会自动进行重连,内部通过while(true)死循环来进行重试获取Response(有重试上限,超过会抛出异常)。
影响process方法中的StreamAllocation的创建。
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
//创建streamAllocation对象
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
//1.循环获取请求结果
while (true) {
Response response = null;
boolean releaseConnection = true;
try {
//2.责任链下一环去处理请求,获取请求结果
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 3.通过路由连接的尝试失败,这时请求没有发送出去。判断满足可恢复条件,满足则继续循环重试。
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 尝试与服务器通信失败。请求可能已经发送。
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 抛出异常如果没有被catch,或没有异常发生,释放streamAllocation资源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
//4.分析返回码,303的话需要重定向
Request followUp = followUpRequest(response);
//不需要重定向就返回请求结果
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
//5.重定向次数不能超过MAX_FOLLOW_UPS
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
}
}
- 通过循环来获取请求返回值
- 责任链叫给下一级获取请求返回值
- 请求过程中发生RouteException,IOException异常,则进行重连请求。
- 重定向判断,如需要就重定向,不需要就返回请求值给上一级责任链。
- 重定向次数不能超过MAX_FOLLOW_UPS,20次。
5.1.3 BridgeInterceptor
适配器模式(Adapter Pattern)是作为两个不兼容的接口之间的桥梁。而BridgeInterceptor也是起着这样的作用,它是实现应用层和网络层直接的数据格式编码的桥,用于完善请求头。影响process方法中的Request参数。
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//1.完善请求头
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
...
}
...
//2.责任链下一环,获取请求返回值
Response networkResponse = chain.proceed(requestBuilder.build());
//3.对返回值的消息报头进行转化
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
...
}
}
如注释所写的,分为三步
- 完善请求头
- 责任链下一环,获取请求返回值
- 对返回值的消息报头进行转化
就是对请求头和消息报头处理。
5.1.4 CacheInterceptor
要了解这里缓存如何实现,还是要先了解http缓存机制。但也要注意一点,有些缓存字符意义并不完全一样,这个解析CacheInterceptor代码时介绍。
5.1.4.1 http缓存机制
这里就不大篇幅描述,只做简要概述。
缓存其实可以分成两类,一种是不请求网络读取本地缓存的,一种是请求网络进行对比的缓存。Header中的Expires/Cache-Control属于第一种,Last-Modified / If-Modified-Since,Etag / If-None-Match属于第二种。
- 第一种缓存的流程:客户端请求数据,直接从本地缓存返回结果;本地缓存失效则请求网络,再将缓存请求结果。
- 第二种缓存流程:客户端请求数据,从缓存获取缓存里header的缓存标示,再请求服务器验证本地缓存是否失效,失效则返回结果并缓存;有效通知客户端缓存有效,客户端读取本地缓存。
5.1.4.1.1 Expires
Expires的值为服务端返回的到期时间,即下一次请求时,请求时间小于服务端返回的到期时间,直接使用缓存数据。
Expires 是HTTP 1.0的东西,现在默认浏览器均默认使用HTTP 1.1,所以它的作用基本忽略。所以HTTP 1.1 的版本,使用Cache-Control替代。
5.1.4.1.2 Cache-Control
请求头
指令 | 说明 |
---|---|
no-cache | 需要使用网络对比缓存来验证缓存数据 |
no-store | 所有内容都不会缓存 |
max-age=[秒] | 缓存的内容将在 xxx 秒后失效 |
max-stale=[秒] | 可接受的最大过期时间 |
min-fresh=[秒] | 询问再过[秒]时间后资源是否过期,若过期则不返回 |
only-if-cached | 只获取缓存的资源而不联网获取,如果缓存没有命中,返回504 |
响应头还有public,private等,这些在特别情况下如响应状态码为302,307等情况作用缓存响应消息。
5.1.4.1.3 Last-Modified / If-Modified-Since
- Last-Modified在响应请求时,告诉浏览器资源的最后修改时间;
- If-Modified-Since在请求服务时,通过此字段通知服务器上次请求时间(上次响应头的Last-Modified值),服务器比较其 与被请求资源的最后修改时间进行比对,若修改时间大于其,则返回所有资源和状态码200;修改时间小于其,说明资源没有修改,则响应HTTP304,告诉客户端使用缓存。
5.1.4.1.4 Etag / If-None-Match(优先级高于Last-Modified / If-Modified-Since)
- Etag:在响应请求头中,存储当前资源在服务器的唯一标识
- If-None-Match:在请求头中,存储服务器客户段缓存数据的唯一标识。服务器会比对其和被请求资源的唯一标识,如果不同,则返回被请求资源和状态码200;如果相同,说明资源没有修改,则响应HTTP 304,告诉客户端使用缓存。
5.1.4.2 CacheInterceptor
OKHttp的缓存策略,就是实现Http的缓存策略。
public final class CacheInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//1.如果配置了缓存,获取同一请求的缓存Response
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//2.根据请求头和缓存的响应,生成缓存策略
//也就是对请求头和响应头中缓存相关的标示生成策略,看用哪种HTTP缓存方式
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//网络请求,为空则不请求网络
Request networkRequest = strategy.networkRequest;
//缓存,为空则不获取缓存
Response cacheResponse = strategy.cacheResponse;
//记录网络请求或缓存结果获取的次数
if (cache != null) {
cache.trackResponse(strategy);
}
//如果有缓存这次请求却不用缓存,则释放缓存
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
// 3.如果网络没有且无法从缓存中获取结果,返回504错误
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 4.如果不需要网络请求,缓存结果有,则使用缓存获取结果
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//5.走到这步,则说明有网络请求。通过责任链下一环获取响应值。
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// 如果网络请求失败且缓存结果不为空,则释放缓存
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 6.,缓存结果不为空,且网络请求返回304,则读取本地缓存
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
//合并网络请求返回结果和缓存结果,主要是合并网络请求结果的header
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// 6.1 将结果集更新到缓存中
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//7.走到这,说明不需要使用缓存,则直接使用网络结果
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//8.如果有缓存,经过服务器校验缓存过期了
if (cache != null) {
//如果网络请求结果有响应正文,且根据请求和响应的Header判断需要保存缓存,就保存
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// 将网络结果保存到缓存中
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
//不需要保存缓存,且是POST,PATCH,PUT,DELETE,MOVE中的请求方法,则删除缓存
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
//9.走到这里,说明网络结果过程都没用到缓存,是无缓存操作
return response;
}
}
这个方法,其实就是实现Http缓存,大部分可以按HTTP缓存来理解它。Header里缓存字段如何配置,决定了缓存要如何处理。这些上面介绍Http缓存时已经说过,就不再赘述。
说几个要注意的地方:
- 要把网络请求结果缓存下来,要响应正文不为空,且请求头和响应头的缓存字段Cache-Control的值都不为no-cache。
- Cache对象需要我们在OKHttpClient中配置好,Cache主要有put方法和get方法,通过DiskLruCache算法写入,读取缓存,用Entry类来存储所有的缓存信息。
但缓存实现也有与Http缓存介绍不同的地方,这需要到CacheStrategy中分析。
5.1.4.3 CacheStrategy
缓存策略的生成,它是通过工厂模式创建,和Retrofit的字符转换器网络适配器生成几乎一样的行式:创建工厂在调用get创建。
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
先分析Factory
public static class Factory {
public Factory(long nowMillis, Request request, Response cacheResponse) {
this.nowMillis = nowMillis;
this.request = request;
this.cacheResponse = cacheResponse;
//将缓存结果的header分析,将缓存相关字符分离出来
if (cacheResponse != null) {
this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
Headers headers = cacheResponse.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String fieldName = headers.name(i);
String value = headers.value(i);
if ("Date".equalsIgnoreCase(fieldName)) {
servedDate = HttpDate.parse(value);
servedDateString = value;
} else if ("Expires".equalsIgnoreCase(fieldName)) {
expires = HttpDate.parse(value);
} else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
lastModified = HttpDate.parse(value);
lastModifiedString = value;
} else if ("ETag".equalsIgnoreCase(fieldName)) {
etag = value;
} else if ("Age".equalsIgnoreCase(fieldName)) {
ageSeconds = HttpHeaders.parseSeconds(value, -1);
}
}
}
}
}
主要作用就是将缓存结果头中的缓存字符分离出来,为后面的缓存策略生成做准备。
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
//如果请求头有only-if-cached,说明只用缓存,但缓存又失效了,网络缓存都不能用
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
return new CacheStrategy(null, null);
}
return candidate;
}
对only-if-cached字符加了道验证,策略实现的主要方法是getCandidate
private CacheStrategy getCandidate() {
// 1.没有缓存结果,创建的策略只用网络请求
if (cacheResponse == null) {
return new CacheStrategy(request, null);
}
// 2.当请求的协议是https的时候,如果cache没有TLS握手就丢弃缓存,只用网络请求
if (request.isHttps() && cacheResponse.handshake() == null) {
return new CacheStrategy(request, null);
}
// 3.cacheResponse和request的header的Cache-Control都要不是no-store,否则只用网络请求
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}
//4.如果请求头里的Cache-Control值是no-cache或有If-Modified-Since,If-None-Match,则只用网络请求
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}
//5.缓存有效性校验,根据缓存的缓存时间,缓存可接受最大过期时间等等HTTP协议上的规范来判断缓存是否可用。
//这里是第一种不请求网络只用本地缓存的策略
long ageMillis = cacheResponseAge();
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
CacheControl responseCaching = cacheResponse.cacheControl();
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
}
return new CacheStrategy(null, builder.build());
}
// 6.这里是看请求是否为缓存网络比对。
String conditionName;
String conditionValue;
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
//7.如果比对缓存也不是,那只有常规的网络请求了
return new CacheStrategy(request, null);
}
Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build();
//7.返回第二种网络比对的缓存策略
return new CacheStrategy(conditionalRequest, cacheResponse);
}
CacheControl是request和Response中为辅助缓存处理的类,通过分解header的缓存字符来创建。
其实创建的策略就三种,
- 常规网络请求;只有networkRequest参数
- 只获取本地缓存结果,不用网络请求;只有cacheResponse参数
- 网络比对缓存获取结果的。networkRequest和cacheResponse都有
only-if-cached不能单独使用,它只是限制网络请求,缓存有效的判断还是要另外设置。
缓存策略与Http缓存不同的点:
- CacheStrategy缓存策略,就是根据缓存字段来实现缓存算法。但有一点,request的header里包含If-Modified-Since或If-None-Match或no-cache,会直接走网络请求而不走缓存。这两个字符是要在缓存结果里有才会走缓存,这与上面介绍的HTTP缓存字段不同。具体看下面分析。
- 当只用本地缓存时,Http缓存会在本地缓存失效后再请求网络,而OKhttp只会请求本地缓存,不会再请求网络。
5.1.5 ConnectInterceptor
解析完缓存,责任链下一环就是ConnectInterceptor了,它通过StreamAllocation获取到HttpCodec和RealConnection。
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//这是RetryAndFollowUpInterceptor创建并传入的
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// HTTP/1.1生成Http1xStream;HTTP/2 生成Http2xStream ,实际上是通过RealConnection生成的
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//从连接池中获取可用连接
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
try {
//1.获取连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//通过连接获取HttpCodec
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
HttpCodec里实际是利用Okio来实现读写。
先来看如何获取RealConnection吧
5.1.5.1 RealConnection的获取
findHealthyConnection -> findConnection
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
//connectionPool 连接池
synchronized (connectionPool) {
// 1.复用已有连接
RealConnection allocatedConnection = this.connection;
//已有连接是否可用
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// 2.从连接池中获取连接,赋值给了connection
Internal.instance.get(connectionPool, address, this, null);
//连接池获到了connection,就使用
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 如果需要路由,就获取一个,里面有代理信息
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// 3.加上路由再从连接池获取连接一次
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// 4.自己创建连接
// 用一个异步cancel()来中断我们将要进行的握手。
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
// 这里初始化初始化了BufferedSource和BufferedSink,与创建HttpCodec有关。
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// 5.将新建的连接放入连接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
RealConnection的获取,总得来说有四个优先级:
- StreamAllocation中已有的connection。
- 无需路由参数从连接池获取的connection。
- 有路由参数从连接池获取的connection。
- 自己创建的connection
RealConnection的connect方法里,调用connectSocket将实现网络请求的Socket和用Okio实现了读写,也就是说请求消息和响应消息都通过Okio来实现读写了,实现读写的关键BufferedSource和BufferedSink在这里初始化,也实现HttpCodec功能的重要参数。
对于从连接池获取connection,分析下连接池ConnectionPool就能明白怎么实现的了。
5.1.5.2 ConnectionPool
既然是连接池,那我们首要关心的是如果存储连接,和如何管理连接。
private final Deque<RealConnection> connections = new ArrayDeque<>();
在ConnectionPool中,是用双向队列Deque来存储连接。
再看放入连接方法
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//进行连接池清理
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
//放入队列
connections.add(connection);
}
在每次放入新的连接时,先清理存放的连接,再存放连接。
//连接存放的最大数量,默认为5
private final int maxIdleConnections;
//空闲连接存活时间,最多五分钟
private final long keepAliveDurationNs;
//用线程池来进行连接清理
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
//cleanup就是根据设置的存活条件,进行连接清理
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
在OKHttp3 的默认实现中,这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
//根据传入的地址和路由,判断连接是否合格
if (connection.isEligible(address, route)) {
//acquire方法赋值连接给streamAllocation的connection
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
//streamAllocation的acquire方法
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
//进行赋值
this.connection = connection;
//用allocations来记录每个使用了connection的streamAllocation对象
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
5.1.6 CallServerInterceptor
这里就是执行网络请求的拦截器了。
@Override public Response intercept(Chain chain) throws IOException {
//责任链
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//从ConnectInterceptor中获取的httpCodec
HttpCodec httpCodec = realChain.httpStream();
//从retryAndFollowUpInterceptor获取的streamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
//从ConnectInterceptor中获取的connection
RealConnection connection = (RealConnection) realChain.connection();
//在BridgeInterceptor进行组装Request的header
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//将请求头放入
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
//对请求消息的请求数据不为空的类型进行操作
//操作是对头包含“Expect: 100-continue”的请求,特别处理
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
//对头包含“Expect: 100-continue”的请求,先询问服务器是否愿意接受数据
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
//发送请求询问
httpCodec.flushRequest();
//获取返回值,如果code为100,则responseBuilder为空,表示服务器原因接收
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// 通过请求消息,获得http的主体,httpCodec的内部类
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
//获取RealBufferedSink对象,和RealConnection中创建的BufferedSink是同一个类
//相当于迭代执行请求消息写入
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//将请求消息通过BufferedSink迭代写入,bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink
//最后一个sink是与Socket关联的类,是Okio的静态内部类
request.body().writeTo(bufferedRequestBody);
//迭代关闭BufferedSink
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// "Expect: 100-continue"询问结果为不接受body,所以关闭请求
streamAllocation.noNewStreams();
}
}
//将请求消息写入socket,socket发送请求消息;没有发送请求就会正常发送,发送过的就什么都不执行
httpCodec.finishRequest();
if (responseBuilder == null) {
//读取响应头
responseBuilder = httpCodec.readResponseHeaders(false);
}
//根据响应头先创建Response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
//将响应正文加入response,openResponseBody方法里会用Okio读取正文。
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
对于网络请求真正的实现,还是利用Okio对Socket的读写操作进行封装,例如有请求数据时的Okio写入流程:
bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink
最后一个sink是与Socket关联的类,是Okio的静态内部类
6.总结
总体流程:- OkHttpClient对象对线程池和连接池都有管理,所有OkHttpClient最好做成用单例模式创建。创建多个OkHttpClient对象会占用更多内存。
- Dispatcher管理线程池;ConnectionPool是连接池管理,这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。。
它们都用双向队列来存储管理数据。Dispatcher中有三个队列:Deque<AsyncCall> readyAsyncCalls,Deque<AsyncCall> runningAsyncCalls,Deque<RealCall> runningSyncCalls。ConnectionPool中有Deque<RealConnection> connections。
- Request用来配置请求消息。
- okhttp3.Call是一个接口规定了需要执行的几个行为,具体的实现类有RealCall和AyncCall;RealCall中初始化了一些参数,主要是拦截器,异步请求由dispatcher执行,同步异步执行内容都是用责任链来实现网络请求。RealInterceptorChain与Interceptor是实现的关键,相互迭代调用。
- RetryAndFollowUpInterceptor,重试与重定向拦截器,创建了streamAllocation对象
- BridgeInterceptor,用适配器模式思想实现应用层和网络层直接的数据格式编码的桥,用于完善请求头。
- CacheInterceptor,根据缓存结果和请求数据来策略实现缓存Http缓存功能。
缓存策略与Http缓存不同的点:
CacheStrategy缓存策略,就是根据缓存字段来实现缓存算法。但有一点,request的header里包含If-Modified-Since或If-None-Match或no-cache,会直接走网络请求而不走缓存。这两个字符是要在缓存结果里有才会走缓存,这与上面介绍的HTTP缓存字段不同。具体看下面分析。
当只用本地缓存时,Http缓存会在本地缓存失效后再请求网络,而OKhttp只会请求本地缓存,不会再请求网络。
- ConnectInterceptor,它通过StreamAllocation获取到HttpCodec和RealConnection。在创建RealConnection时,也初始化初始化了BufferedSource和BufferedSink,与创建HttpCodec有关。
BufferedSource和BufferedSink是Okio连接网络的Socket实现其读写,RealConnection创建HttpCodec来统一管理网络请求的输入输出。例如有请求数据时的Okio写入流程:
bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink->socket.getOutputStream()
- CallServerInterceptor,网络请求的拦截器,责任链最后一层。利用httpCodec将请求数据发送网络,实际上是利用Okio将请求数据写入Socket,请求网络。