分析Okhttp源码的一篇笔记
1.OkHttp的请求网络流程
使用就不说了 Okhttp官网:http://square.github.io/okhttp/
(1) 从请求处理开始分析
OkHttpClient.newCall(request) 进行 execute 或者 enqueue操作 当调用newCall方法时
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
看到返回的是一个RealCall类, 调用enqueue 异步请求网络实际调用的是RealCall的enqueue方法,接下来看一下RealCall的enqueue里面干了什么
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//最终的请求是dispatcher来完成
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
So 接下来分析一下dispatcher
(2) Dispatcher任务调度
Dispatcher 主要是控制并发的请求,它主要维护一下变量:
//最大并发请求数
private int maxRequests = 64;
//每个主机的最大请求数
private int maxRequestsPerHost = 5;
/** 消费者线程 */
private ExecutorService executorService;
/** 将要运行的异步请求队列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在运行的异步请求队列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在运行的同步请求队列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Dispatcher构造方法
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
两个构造方法,可以使用自己设定的线程池,如果没有设定,则会请求网络前自己创建默认线程池。这个线程池类似CachedThreadPool,比较适合执行大量的耗时比较少的任务,其实当调用RealCall的enqueue方法实际上调用的Dispatcher里面的enqueue方法
synchronized void enqueue(AsyncCall call) {
//如果正在运行的异步请求队列小于64且 正在运行的请求主机数小于5 则把请求加载到runningAsyncCalls
//中并在线程池中执行,否则就加入到readyAsyncCalls中进行缓存等待。
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
线程池中传进来的参数AsyncCall,它是RealCall的内部类,其内部也实现了execute方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
...
} catch (IOException e) {
...
} finally {
//无论这个请求的结果如何,都会执行 client.dispatcher().finished(this);
client.dispatcher().finished(this);
}
}
}
finished方法如下
synchronized void finished(AsyncCall call) {
//将此次请求 runningAsyncCalls移除后还执行promoteCalls方法
if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
promoteCalls方法如下
private void promoteCalls() {
//如果正在运行的异步请求队列数大于最大请求数 return
if (runningAsyncCalls.size() >= maxRequests) return;
//若果将要运行的异步请求队列为空 return
if (readyAsyncCalls.isEmpty()) return;
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();//取出下一个请求
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);//加入runningAsyncCalls中并交由线程池处理
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
在回AsyncCall的execute方法
@Override protected void execute() {
try {
//getResponseWithInterceptorChain() 返回Response 请求网络
Response response = getResponseWithInterceptorChain(forWebSocket);
...
} catch (IOException e) {
...
} finally {
...
}
}
}
(3) Interceptor拦截器
getResponseWithInterceptorChain方法如下
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}
getResponseWithInterceptorChain方法中创建了ApplicationInterceptorChain,他是一个拦截器链,这个类也是RealCall的内部类,接下来执行了它的proceed方法
@Override public Response proceed(Request request) throws IOException {
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
//从拦截器列表中取出拦截器
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);//1
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// 如果没有更多拦截器的话 执行网络请求
return getResponse(request, forWebSocket);
}
}
proceed方法每次从拦截器列表中去除拦截器,当存在多个拦截器时都会在上面注释1处阻塞,并等待下一个拦截器的调用返回,下面分别以拦截器中有一个、两个拦截器的场景加以模拟
拦截器是一种能够监控、重写、重试调用的机制。通常情况下用来添加、移除、转换请求和响应的头部信息,比如将域名替换为IP地址,在请求中添加host属性,也可以添加我们应用中的一下公共参数,比如设备id、版本号等,回到代码上来, 最后一行返回getResponse(request, forWebSocket)
来看getResponse做了什么
Response getResponse(Request request, boolean forWebSocket) throws IOException {
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
throw e.getCause();
} catch (RouteException e) {
} catch (IOException e) {
...
}
}
}
创建了HttpEngine并且调用了HttpEngine的sendRequest方法和readResponse方法
(4) 缓存策略
查看一下sendRequest方法
public void sendRequest() throws RequestException, RouteException, IOException {
if (cacheStrategy != null) return; // Already sent.
if (httpStream != null) throw new IllegalStateException();
Request request = networkRequest(userRequest);
// 获取client中的Cache,同时Cache在初始化时会读取缓存目录中曾经请求过的所有信息
InternalCache responseCache = Internal.instance.internalCache(client);
Response cacheCandidate = responseCache != null
? responseCache.get(request)//1
: null;
long now = System.currentTimeMillis();
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
//网络请求
networkRequest = cacheStrategy.networkRequest;
//缓存的响应
cacheResponse = cacheStrategy.cacheResponse;
if (responseCache != null) {
//记录当前请求是网络发起还是缓存发起
responseCache.trackResponse(cacheStrategy);
}
// 不进行网络请求并且缓存不存在或者过期,则返回504
if (networkRequest == null && cacheResponse == null) {
userResponse = new Response.Builder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.build();
return;
}
// 不进行网络请求而且缓存可以使用,则直接返回缓存
if (networkRequest == null) {
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.build();
userResponse = unzip(userResponse);
return;
}
// 需要访问网络时
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
...
}
}
显然是发送请求,但是最主要的是做了缓存的策略,上面注释1处cacheCandidate 是上次与服务器交互缓存的Response,这里缓存均基于Map,key是请求中的url的md5,value是在文件中查询到的缓存,页面置换基于LRU算法,现在只需知道cachCandidate是一个可以读取缓存Header的Response即可,根据cacheStrategy的处理得到了networkRequest和cacheResponse都是为nulld的情况下,也就是不进行网络请求并且缓存不存在或者过期,这个是返回504错误,当networkRequest为null时也就是不进行网络请求,如果缓存可以使用时则直接返回缓存,其他则请求网络。
接下来查看readResponse方法
public void readResponse() throws IOException {
...
Response networkResponse;
if (forWebSocket) {
//读取网络响应
networkResponse = readNetworkResponse();
}
receiveHeaders(networkResponse.headers());
//检查缓存是否可用,如果可用,就用当前缓存的Response,关闭网络连接,释放连接
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {//1
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
releaseStreamAllocation();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
InternalCache responseCache = Internal.instance.internalCache(client);
responseCache.trackConditionalCacheHit();
responseCache.update(cacheResponse, stripBody(userResponse));
userResponse = unzip(userResponse);
return;
} else {
closeQuietly(cacheResponse.body());
}
}
);
}
这个方法主要用来解析HTTP响应报文,如果有缓存并且可用,则用缓存的数据并更新缓存,否则就用网络 请求返回的数据,查看注释1处validate方法是如何判断缓存是否可用。
private static boolean validate(Response cached, Response network) {
//如果服务器返回304,则缓存有效
if (network.code() == HTTP_NOT_MODIFIED) {
return true;
}
//通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据,如果是,则缓存有效
Date lastModified = cached.headers().getDate("Last-Modified");
if (lastModified != null) {
Date networkLastModified = network.headers().getDate("Last-Modified");
if (networkLastModified != null
&& networkLastModified.getTime() < lastModified.getTime()) {
return true;
}
}
return false;
}
如果缓存有效,则返回304 Not Modified,否者直接返回body。如果缓存过期或者强制放弃缓存,则缓存策略全部交给服务器判断,客服端只需要发送条件GET请求即可。条件GET请求有两种方式:一种是Last-Modified-Data,另一种ETag,这里采用Last-Modified-Data,通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据 ,如果是 则缓存有效
(5) 失败重连
重回RealCall的getResponse方法
Response getResponse(Request request, boolean forWebSocket) throws IOException {
...
try {
}catch (RouteException e) {
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);//1
} catch (IOException e) {
HttpEngine retryEngine = engine.recover(e, null);//2
}
}
}
当发生IOException或者RouteException时都会执行HttpEngine的recover方法 ,代码如下:
public HttpEngine recover(IOException e, Sink requestBodyOut) {
if (!streamAllocation.recover(e, requestBodyOut)) {
return null;
}
if (!client.retryOnConnectionFailure()) {
return null;
}
StreamAllocation streamAllocation = close();
//重新创建HttpEngine并返回,用来完成重连
return new HttpEngine(client, userRequest, bufferRequestBody, callerWritesRequestBody,
forWebSocket, streamAllocation, (RetryableSink) requestBodyOut, priorResponse);
}
Okhttp请求流程图