一.简介
OkHttp是当下Android使用最频繁的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp,现在流行的Retrofit框架底层同样是使用的OKHttp。
OkHttp优点:
支持Http1、Http2、Quic以及WebSocket
连接池复用底层TCP(Socket),减少请求延时
无缝的支持GZIP减少数据流量
缓存响应数据减少重复的网络请求
请求失败自动重试主机的其他ip,自动重定向
......
二.调用流程
本文对Retrofit2使用的okhttp的相关源码进行分析,先上张Retrofit2在使用Okhttp的流程图:
从上面流程图可以看到,入口为执行OkhttpCall的enqueue方法,在OkhttpCall中会通过OkHttpClient来创建RealCall,该OkHttpClient是在通过Retrofit.build()来构造Retrofit时创建的,相关逻辑请参考Retrofit2原理分析。
在使用OkHttp发起一次请求时,对于使用者最少存在 OkHttpClient 、 Request 与 Call 三个角色。其中OkHttpClient 和 Request 的创建可以使用它为我们提供的 Builder (建造者模式)。而Call则是把Request交给OkHttpClient之后执行newCall()返回的一个已准备好执行的请求。
OkHttp在设计时采用的门面模式(外观模式),将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端OkHttpClient统一暴露出来,OkHttpClient 中全是一些配置,比如代理的配置、Dns的配置等。而Call本身是一个接口,我们获得的实现为RealCall。
Call执行分为两种方式,execute和enqueue,execute代表了同步请求,enqueue则代表异步请求。两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行,涉及到OkHttp的任务分发器。
三.源码分析
接下来对涉及到的核心类进行分析,一起看一下每个类的主要功能及调用关系:
a.OkHttpClient.java
OkHttpClient是OkHttp的调用入口类,实现了Call.Factory接口,是在Retrofit.build()时进行创建的,看一下创建时的逻辑:
//Retrofit.java
public Retrofit build() {
......
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
callFactory = new OkHttpClient();
}
......
......
}
看一下在实例化OkHttpClient时执行了哪些操作:
public class OkHttpClient implements Cloneable, Call.Factory {
......
......
final Dispatcher dispatcher;
final Proxy proxy;
......
//许多变量
......
public OkHttpClient() {
this(new Builder());
}
private OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
......
//变量赋值,将Builder的变量值赋值给OkHttpClient的变量
......
}
public static final class Builder {
Dispatcher dispatcher;
Proxy proxy;
......
//跟OkHttpClient对应的变量
......
public Builder() {
dispatcher = new Dispatcher();
......
//初始化变量
......
}
//通过该方法获取到RealCall,通过RealCall来进行请求
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
从代码可以看到,OkHttpClient主要是初始化一些变量,包括创建Dispatcher,即分发器,后续执行请求的时候会通过该分发器来进行处理,提供newCall()方法返回请求的真实实例RealCall。
b.RealCall.java
Call是一个接口,而RealCall是真实的实现,通过该实例来进行请求,看一下RealCall的内部逻辑:
①.RealCall内部的成员变量:
//持有OkHttpClient对象的引用
private final OkHttpClient client;
//app端的request请求
Request originalRequest;
//最终的请求执行
HttpEngine engine;
//构造方法中传入OkHttpClient对象的引用及app端的request
protected RealCall(OkHttpClient client, Request originalRequest) {
this.client = client;
this.originalRequest = originalRequest;
}
②.同步请求:
@Override public Response execute() throws IOException {
synchronized (this) {
......
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain(false);
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
同步请求会先调用client.dispatcher的executed()方法,然后执行getResponseWithInterceptorChain()等待返回Response,执行完后会调用client.dispatcher的finished()方法,关于dispatcher相关的方法调用后面会讲到。
③.异步请求:
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
......
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
异步请求会先调用client.dispatcher的enqueue()方法,创建一个AsyncCall()作为参数传入,responseCallback是app在调用时传入的回调方法,通过该方法将Response回调给app,执行client.dispatcher().enqueue()后,会在线程池中执行AsyncCall,继而会执行到Runnable的run()方法,先看一下AsyncCall这个类:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private final boolean forWebSocket;
private AsyncCall(Callback responseCallback, boolean forWebSocket) {
super("OkHttp %s", redactedUrl().toString());
this.responseCallback = responseCallback;
this.forWebSocket = forWebSocket;
}
......
......
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(forWebSocket);
if (canceled) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
......
} finally {
client.dispatcher().finished(this);
}
}
}
NamedRunnable是一个抽象类,实现了Runnable,在执行run()方法时,会调用抽象方法execute(),该方法由AsyncCall来实现,即通过线程池中执行线程后,最终会调用AsyncCall的execute()方法。
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
在AsyncCall中执行execute()后,会执行getResponseWithInterceptorChain()来返回Response,执行完后会调用client.dispatcher的finished()方法,关于dispatcher相关的方法调用后面会讲到,getResponseWithInterceptorChain()是核心方法,一起看一下:
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}
class ApplicationInterceptorChain implements Interceptor.Chain {
private final int index;
private final Request request;
private final boolean forWebSocket;
ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) {
this.index = index;
this.request = request;
this.forWebSocket = forWebSocket;
}
......
@Override public Response proceed(Request request) throws IOException {
//初始化时没有创建拦截器,省略以下逻辑
......
//直接调用该方法
return getResponse(request, forWebSocket);
}
}
getResponseWithInterceptorChain()从字面意思来看是拦截器链,由于Retrofit在创建OkHttpClient时没有传入拦截器,即client.interceptors().size()的大小为0,因此不会触发拦截器的逻辑。
跟OkHttp的源码逻辑不一致,OkHttp的源码中包含了5个拦截器,每个拦截器实现了一部分功能,一个一个传输,各司其职,最终来实现网络请求。
由于Retrofit在使用时没有传入拦截器,则会直接执行到getResponse(),该方法实际上是替代了5个拦截器,将5个拦截器的功能都一一实现了,getResponse()是最核心的方法,一起看一下:
Response getResponse(Request request, boolean forWebSocket) throws IOException {
//-------------------------分析1-----------------------------------------
RequestBody body = request.body();
if (body != null) {
Request.Builder requestBuilder = request.newBuilder();
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
request = requestBuilder.build();
}
//-------------------------分析2-----------------------------------------
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
}
.......
//-------------------------分析3-----------------------------------------
.......
.......
//-------------------------分析4-----------------------------------------
.......
}
结合以上代码,分析一下执行流程:
分析1:对发出的请求request进行预处理,包括加入"Content-Length"、"Content-Type",删除"Transfer-Encoding"等,后面在HttpEngine的networkRequest()中会再次进行处理,对app构建的Request进行添加或者删除相关头部信息,转化成能够真正进行网络请求的Request,功能类似BridgeInterceptor即:桥接拦截器。
剩下的逻辑涉及到HttpEngine,因此在接下来的HttpEngine.java内分析。
c.HttpEngine.java
①.HttpEngine的成员变量:
//最大重定向尝试次数
public static final int MAX_FOLLOW_UPS = 20;
//OkHttpClient实例
final OkHttpClient client;
//StreamAllocation实例,通过newStream创建HttpStream
public final StreamAllocation streamAllocation;
//httpStream实例
private HttpStream httpStream;
②.new HttpEngine()
public HttpEngine(OkHttpClient client, Request request, boolean bufferRequestBody,
boolean callerWritesRequestBody, boolean forWebSocket, StreamAllocation streamAllocation,
RetryableSink requestBodyOut, Response priorResponse) {
this.client = client;
this.userRequest = request;
this.bufferRequestBody = false;
this.callerWritesRequestBody = false;
this.forWebSocket = false;
this.streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(client, request));
this.requestBodyOut = null;
this.priorResponse = null;
}
分析2:创建HttpEngine对象,主要是传入了OkHttpClient和经过处理后的request,然后创建了StreamAllocation,在创建StreamAllocation时传入了request,通过createAddress(client,request)来获取到request时访问的服务器地址,后续在connect()执行的流程中会用到,接下来执行sendRequest(),看一下该方法的实现:
③.sendRequest()
public void sendRequest() throws RequestException, RouteException, IOException {
......
Request request = networkRequest(userRequest);
InternalCache responseCache = Internal.instance.internalCache(client);
Response cacheCandidate = responseCache != null
? responseCache.get(request)
: null;
long now = System.currentTimeMillis();
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
networkRequest = cacheStrategy.networkRequest;
cacheResponse = cacheStrategy.cacheResponse;
......
//直接返回504错误码
if (networkRequest == null && cacheResponse == null) {
userResponse = new Response.Builder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
return;
}
// If we don't need the network, we're done.
if (networkRequest == null) {
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.build();
userResponse = unzip(userResponse);
return;
}
// 发起网络请求
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
if (writeRequestHeadersEagerly()) {
long contentLength = OkHeaders.contentLength(request);
......
......
httpStream.writeRequestHeaders(networkRequest);
requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
}
}
success = true;
}
......
}
}
分析2:在sendRequest()内,首先如果从缓存中获得了本次请求URL对应的 Response ,首先会从响应中获得以上数据备用,通过get()去获取缓存,方法中调用 getCandidate() 方法来完成真正的缓存判断。
//CacheStrategy.java
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
//如果可以使用缓存,那networkRequest必定为null;指定了只使用缓存但是networkRequest又不为null,则会冲突。那就在接下来返回504
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
return new CacheStrategy(null, null);
}
return candidate;
}
在getCandidate()中,cacheResponse是从缓存中找到的响应,如果为null,那就表示没有找到对应的缓存,创建的CacheStrategy实例对象只存在 networkRequest,这代表了需要发起网络请求。在CacheStrategy内有好多其他的判断就处理逻辑,就不一一分析了。
在发出请求前,判断是否命中缓存。如果命中则可以不请求,直接使用缓存的响应,此功能类似CacheInterceptor,即缓存拦截器。
分析2:在进行网络请求时,先执行connect(),在该方法内会通过StreamAllocation的newStream()来创建HttpStream对象,由于HttpStream是接口,所以会创建具体的实现类对象。
④.connect()
private HttpStream connect() throws RouteException, RequestException, IOException {
boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
return streamAllocation.newStream(client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis(),
client.retryOnConnectionFailure(), doExtensiveHealthChecks);
}
在connect()内会执行到StreamAllocation.java里面的newStream()
public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws RouteException, IOException {
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpStream resultStream;
if (resultConnection.framedConnection != null) {
resultStream = new Http2xStream(this, resultConnection.framedConnection);
} else {
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink);
}
synchronized (connectionPool) {
stream = resultStream;
return resultStream;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
从上述代码来看,会先执行findHealthyConnection(),会先从connectionPool里面去获取一个RealConnection,如果获取不到,则新创建一个RealConnection。
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException, RouteException {
Route selectedRoute;
synchronized (connectionPool) {
// 先从connectionPool中获取
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
......
}
//创建一个RealConnection
RealConnection newConnection = new RealConnection(selectedRoute);
acquire(newConnection);
//加入connectionPool,后续可以使用
synchronized (connectionPool) {
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());
return newConnection;
}
经过层层调用,会调用到connectSocket()方法,该方法会创建对应address的socket,然后获取到rawSocket对应的source和sink,分别对应InputStream和OutputStream来接收reponse和发送request。该功能类似ConnectInterceptor ,即连接拦截器,打开与目标服务器的连接。
private void connectSocket(int connectTimeout, int readTimeout, int writeTimeout,
ConnectionSpecSelector connectionSpecSelector) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
throw new ConnectException("Failed to connect to " + route.socketAddress());
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
sendRequest()方法中只是做了一些初始化和创建socket并与服务器建立连接,执行了writeRequestHeaders(),并未真正发送request,真正的发送操作是finishRequest()执行sink.flush(),该方法在调用readResponse()后一步一步执行的,看一下readResponse()的逻辑:
⑤.readResponse()
public void readResponse() throws IOException {
......
Response networkResponse;
......
else if (!callerWritesRequestBody) {
networkResponse = new NetworkInterceptorChain(0, networkRequest,
streamAllocation.connection()).proceed(networkRequest);
}
......
receiveHeaders(networkResponse.headers());
......
userResponse = networkResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (hasBody(userResponse)) {
maybeCache();
userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
}
}
在readResponse()中经过判断后,会创建一个NetworkInterceptorChain实例,并执行proceed(networkRequest)方法:
class NetworkInterceptorChain implements Interceptor.Chain {
......
......
@Override public Response proceed(Request request) throws IOException {
calls++;
......
......
httpStream.writeRequestHeaders(request);
//Update the networkRequest with the possibly updated interceptor request.
networkRequest = request;
if (permitsRequestBody(request) && request.body() != null) {
Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
Response response = readNetworkResponse();
int code = response.code();
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
}
会执行readNetworkResponse()来返回Response:
private Response readNetworkResponse() throws IOException {
//发送请求
httpStream.finishRequest();
Response networkResponse = httpStream.readResponseHeaders()
.request(networkRequest)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
if (!forWebSocket) {
networkResponse = networkResponse.newBuilder()
//读取到返回的Response body体
.body(httpStream.openResponseBody(networkResponse))
.build();
}
if ("close".equalsIgnoreCase(networkResponse.request().header("Connection"))
|| "close".equalsIgnoreCase(networkResponse.header("Connection"))) {
streamAllocation.noNewStreams();
}
return networkResponse;
}
真正的发送操作是finishRequest()内执行sink.flush()
@Override public void finishRequest() throws IOException {
sink.flush();
}
通过执行finishRequest()来执行sink.flush()来发送请求,然后就读取服务器返回的response并返回,该功能类似CallServerInterceptor,即请求服务器拦截器,发出请求到服务器并且解析生成 Response 。
梳理一下RealCall在执行getResponse()时,HttpEngine主要执行流程:
接着对前面进行分析,在通过HttpEngine执行sendRequest()和readResponse()时,遇到异常怎么办?
//-------------------------分析3-----------------------------------------
catch (RouteException e) {
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), true, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
throw e.getLastConnectException();
} catch (IOException e) {
HttpEngine retryEngine = engine.recover(e, false, null);
if (retryEngine != null) {
releaseConnection = false;
engine = retryEngine;
continue;
}
分析3:如果在sendRequest()和readResponse()时出现异常,会在catch()方法中,执行HttpEngine的recover(),在该方法内会根据不同的Exception通过isRecoverable()来判断是否需要重试。简单来说,比如 DNS 对域名解析后可能会返回多个 IP,在一个IP失败后,尝试另一个IP进行重试。
前面在readResponse()后,会发送请求然后获取到服务器端的response,如果请求结束后没有发生异常并不代表当前获得的响应就是最终需要交给用户的,是否还需要进一步操作呢?
//-------------------------分析4-----------------------------------------
Response response = engine.getResponse();
Request followUp = engine.followUpRequest();
if (followUp == null) {
if (!forWebSocket) {
engine.releaseStreamAllocation();
}
return response;
}
StreamAllocation streamAllocation = engine.close();
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (!engine.sameConnection(followUp.url())) {
streamAllocation.release();
streamAllocation = null;
} else if (streamAllocation.stream() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
response);
}
分析4:需要进行重定向判断。重定向的判断位于followUpRequest()方法,整个是否需要重定向的判断内容很多,不一一列举了,如果此方法返回空,那就表示不需要再重定向了,直接返回响应;但是如果返回非空,那就要重新请求返回的 Request ,followup的最大次数为20次。
以上两个分析就是对应的重试和重定向,RetryAndFollowUpInterceptor,即重试及重定向拦截器。
d.Dispatcher.java
Dispatcher:分发器就是来调配请求任务的,内部会包含一个线程池,前面提到,在RealCall中执行execute()和enqueue()都是调用了Dispatcher对应的方法,看一下Dispatcher的实现逻辑:
①.Dispatcher中的成员变量:
//异步请求同时存在的最大请求
private int maxRequests = 64;
//异步请求同一域名同时存在的最大请求
private int maxRequestsPerHost = 5;
//异步请求使用的线程池
private ExecutorService executorService;
//异步请求等待执行队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步请求正在执行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步请求正在执行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
②.同步请求:
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
synchronized void finished(Call call) {
if (!runningSyncCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
}
同步请求不需要线程池,也不存在任何限制。分发器仅做一下记录,后面执行完会调用finished()移除。
③.异步请求:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
synchronized void finished(AsyncCall call) {
if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
private void promoteCalls() {
//如果任务满了直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
//没有等待执行的任务,返回
if (readyAsyncCalls.isEmpty()) return;
//遍历等待执行队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//还需要满足:这个等待任务请求的Host不能已经存在5个了
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return;
}
}
当正在执行的任务未超过最大限制64,同时 runningCallsForHost(call) < maxRequestsPerHost,即同一Host的请求不超过5个,则会添加到正在执行队列,同时提交给线程池。否则先加入等待队列。
如果加入等待队列后,就需要等待有空闲名额才开始执行。因此每次执行完也会调用finished()来移除,在异步请求的finished()方法内会执行 promoteCalls()来从readyAsyncCalls等待队列中重新调配请求,满足条件时,会把等待队列中的任务移动到 runningAsyncCalls并交给线程池执行。
④.线程池
分发器就是来调配请求任务的,内部会包含一个线程池。当异步请求时,会将请求任务交给线程池来执行,看一下线程池是如何创建的已经参数:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(
0, //核心线程
Integer.MAX_VALUE, //最大线程
60, //空闲线程闲置时间
TimeUnit.SECONDS, //闲置时间单位
new SynchronousQueue<Runnable>(), //线程等待队列
Util.threadFactory("OkHttp Dispatcher", false));//线程创建工厂
}
return executorService;
}
在OkHttp的分发器中的线程池定义如上,首先核心线程为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收。而最大线程 Integer.MAX_VALUE与等待队列 SynchronousQueue的组合能够得到最大的吞吐量。即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务,等待队列的不同指定了线程池的不同排队机制。一般来说,等待队列 BlockingQueue 有: ArrayBlockingQueue 、 LinkedBlockingQueue 与 SynchronousQueue 。
假设向线程池提交任务时,核心线程都被占用的情况下:
ArrayBlockingQueue :基于数组的阻塞队列,初始化需要指定固定大小。
当使用此队列时,向线程池提交任务,会首先加入到等待队列中,当等待队列满了之后,再次提交任务,尝试加入队列就会失败,这时就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。所以最终可能出现后提交的任务先执行,而先提交的任务一直在等待。
LinkedBlockingQueue :基于链表实现的阻塞队列,初始化可以指定大小,也可以不指定。
当指定大小后,行为就和 ArrayBlockingQueue一致。而如果未指定大小,则会使用默认的 Integer.MAX_VALUE 作为队列大小。这时候就会出现线程池的最大线程数参数无用,因为无论如何,向线程池提交任务加入等待队列都会成功。最终意味着所有任务都是在核心线程执行。如果核心线程一直被占,那就一直等待。
SynchronousQueue : 无容量的队列。
使用此队列意味着希望获得最大并发量。因为无论如何,向线程池提交任务,往队列提交任务都会失败。而失败后如果没有空闲的非核心线程,就会检查如果当前线程池中的线程数未达到最大线程,则会新建线程执行新提交的任务。完全没有任何等待,唯一制约它的就是最大线程数的个数。因此一般配合Integer.MAX_VALUE就实现了真正的无等待。
但是需要注意的是,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个。那么当设置最大线程数为 Integer.MAX_VALUE 时,OkHttp同时还有最大请求任务执行个数64的限制,这样即解决了这个问题同时也能获得最大吞吐量。
四.总结
Retrofit使用的Okhttp并没有创建5个拦截器,但是内部的执行逻辑都是按照拦截器的思想来实现的,总结一下执行流程:
当App通过Retrofit来调用方法会获取到Observable或Call,然后执行subscribe()或enqueue()来发起请求,一步一步会调用到OkHttp的RealCall,会由任务分发器Dispatcher将请求包装并交给HttpEngine进行处理,对请求request进行重新封装成标准的网络请求,然后进行缓存相关的处理,继而创建socket连接,发送networkRequest请求,如果有异常,会根据异常错误来判断是否进行重试,然后对获取到结果来进行判断是否进行重定向,最后返回服务器的响应数据。
OkHttp的请求流程设计到的类比较多,详细了解可以进一步阅读源码。