前几天在在一本书上看到这样一句话
如果仅从微观的视角关注每一个单独的点,可能会因为看不到整体而迷失方向。
所以不会每一行代码都去很细致地分析。因为抓住了整体再去看细节会比较轻松,而最初就一味追求细致可能会在不知不觉中放弃探索吧。
这篇的分析过程中忽略了InterceptorChain拦截器链的部分,在介绍完大概流程后单独拿出来分析,下一篇再讲。
OkHttp基本使用
来看源码的话,OkHttp用起来应该已经很熟练了,我们从最基础开始看它是如何实现的,直接上基本用法的代码。
public class OkHttpTest {
private OkHttpClient mOkHttpClient;
public OkHttpTest() {
mOkHttpClient = new OkHttpClient();
mOkHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.MINUTES).build();
}
public void synRequest() {
Request newRequest = new Request.Builder()
.url("http://www.baidu.com")
.get()
.build();
Call call = mOkHttpClient.newCall(newRequest);
try {
Response response = call.execute();
System.out.println(response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
public void asynRequest() {
Request newRequest = new Request.Builder()
.url("www.baidu.com")
.get()
.build();
Call call = mOkHttpClient.newCall(newRequest);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
System.out.println("onFailure");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("onSuccess");
System.out.println(response.body().string());
}
});
}
}
主要分为以下五个步骤:
- 构建OkHttpClient
- 构建Request
- 构建Call
- 执行call的execute()发送同步请求或enqueue()发送异步请求
- 对execute()返回的Response处理或enqueue()回调中的Response处理
以上为OkHttp基本使用方法,同步发送请求和异步发送请求的过程。
OkHttpClient的创建流程
OkHttpClient构造器
可以通过以下两种方式创建OkHttpClient
mOkHttpClient = new OkHttpClient();
mOkHttpClient = new OkHttpClient.Builder().readTimeout(5, TimeUnit.MINUTES).build();
进入OkHttpClient()中会发现无参数构造方法调用了一个参数的构造方法,并传入了一个Builder对象。
public OkHttpClient() {
this(new Builder());
}
这里传入一个Builder对象给OkHttpClient一个参数的构造器。
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
//...
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
//...
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
一个参数的构造器主要利用传入的Builder对象初始了OkHttpClient的成员,并做了其他的一些初始工作。
那么我们先来看一下Builder到底是什么。
OkHttpClient的内部类Builder
这里的Builder是OkHttp的一个静态内部类。
public static final class Builder {
//成员变量
Dispatcher dispatcher;
@Nullable Proxy proxy;
List<Protocol> protocols;
List<ConnectionSpec> connectionSpecs;
final List<Interceptor> interceptors = new ArrayList<>();
final List<Interceptor> networkInterceptors = new ArrayList<>();
EventListener.Factory eventListenerFactory;
//...
//对成员变量赋初始值
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
//...
}
//使用参数okHttpClient来对成员变量赋值
Builder(OkHttpClient okHttpClient) {
this.dispatcher = okHttpClient.dispatcher;
this.proxy = okHttpClient.proxy;
this.protocols = okHttpClient.protocols;
this.connectionSpecs = okHttpClient.connectionSpecs;
this.interceptors.addAll(okHttpClient.interceptors);
this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
//...
}
//下面是对成员变量参数的设置的方法
public Builder connectTimeout(long timeout, TimeUnit unit) {
connectTimeout = checkDuration("timeout", timeout, unit);
return this;
}
public Builder readTimeout(long timeout, TimeUnit unit) {
readTimeout = checkDuration("timeout", timeout, unit);
return this;
}
//...很多方法
//将此Builder对象传入OkHttpClient构造器
public OkHttpClient build() {
return new OkHttpClient(this);
}
}
可以看出来Builder对象实际存储了很多参数变量,构造器对其初始化,内部提供改变参数的方法,方法返回本身使其可以链式调用
mOkHttpClient = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.MINUTES)
.writeTimeout(10, TimeUnit.MINUTES)
.build()
最后调用build()将此包含处理过的参数的Builder对象传给OkHttpClient的构造器,构造器中将此Builder对象给OkHttpClient成员变量赋值。
上述Builder内部类其实用到了一种设计模式,构建者模式。通过分析其代码不难发现Builder的主要工作就是封装了外部类需要的参数,并提供了一种比较方便的链式调用的方法去初始化一个类的成员变量,最后传给外部类完成初始化得到外部对象。
创建Request对象
Request newRequest = new Request.Builder()
.url("http://www.baidu.com")
.get()
.build();
Request也是使用了Builder模式。
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
Builder(Request request) {
this.url = request.url;
this.method = request.method;
this.body = request.body;
this.tag = request.tag;
this.headers = request.headers.newBuilder();
}
可以看到默认的Request为Get请求。和OkHttpClient的方式很类似,就不多说了。
Call的创建
Call call = mOkHttpClient.newCall(newRequest);
将创建好的Request传入client的newCall方法返回一个Call对象。
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
OkHttpClient实现了Call接口,newCall()是Call接口内部的Factory接口的抽象方法。
newCall调用的是Call实现类RealCall的newRealCall(),点进去看看,是个静态方法。
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
构造器私有了,三个参数分别是传入的client,传入的原始request和一个是否为WebSocket的flag。
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
注意这里的retryAndFollowUpInterceptor,这个是之后拦截器链的第一个拦截器,之后会介绍到。
RealCall的execute()
Response response = call.execute();
真正的实现仍然是在RealCall中,来看RealCall的execute()。
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
我们来一点一点分析
首先判断了此call对象是否被执行过,如果执行过就抛出异常,否则设置executed=true,继续下面操作。这里synchronized包裹,表明call对象一定只能执行一次。
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
dispatcher()只是返回了client在Builder中初始的dispatcher成员变量,所以来看dispatcher的executed()。这里只做了一项工作,就是将call加入runningSyncCalls队列中,这里留一个印象,只需要知道是将call加入了同步请求队列中就可以了,在后面会细细来讲Dispatcher类的,因为它还是比较重要的。
client.dispatcher().executed(this);
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
之前都一直是和request,到这里终于出现了reponse,也可以才想到,整个请求流程到这里应该是真正拿到了响应,是拦截器链的开端。
Response result = getResponseWithInterceptorChain();
来看getResponseWithInterceptorChain()的实现,将client中的拦截器和默认的拦截器加入集合中,将其传入创建了一个RealInterceptorChain拦截器链chain。最后返回了chain的proceed()的返回值,暂时只需要知道它传入了原始的request,返回了response,具体实现是在RealInterceptorChain这个类中的,我们在专门讲拦截器时再去研究。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
回到RealCall的execute()中来,在最后的finally中有这样一行代码,必须执行dispatcher的finished(),又调用了三个参数的finshed(),再一次看到runningSyncCalls这个队列。记得在开始时我们将call加入到了这个队列中,现在已经请求完毕,猜想应该要从队列中删除这个call对象。
client.dispatcher().finished(this);
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
果然在同步代码块中就有移除call的指令。其他操作是关于dispatcher的,我们在后面再去讲。
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
到一个GET请求就完成了。
RealCall的equeue()
发送一个异步请求其他步骤都是一样的,唯独这一步不一样,我们从RealCall的equeue()开始分析。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
首先也是判断是否执行过,之后调用了dispatcher的equeue(),不同的是这里有一个参数,将传入的responseCallback这个回调接口封装成了AsyncCall对象。
AsyncCall是RealCall的一个静态内部类,继承了NamedRunnable,NamedRunnable实现了Runnable。来看NamedRunnable的run(),调用了execute(),但是并没有实现,它在RealCall中的AsyncCall中实现了,看类声明final class AsyncCall extends NamedRunnable
就确定了。
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
来看AsyncCall的execute(),有没有一点似曾相识的感觉。和RealCall的executed()一样也调用了getResponseWithInterceptorChain()获得response,之后进行一些回调操作。最后同样调用finished(),将其移除队列。那么问题来了,Call是在哪里加入队列的?又是如何实现异步的呢?
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
我们回到client.dispatcher().enqueue(new AsyncCall(responseCallback));
这句话来看看dispatcher的equeue()都做了哪些工作,又为何能够实现异步。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
做了一个判断将call加入runningAsyncCalls还是readyAsyncCalls。同步请求中只有的一个队列,而这里有两个,一个为等待队列一个为执行队列,正是这两个队列实现了异步。在满足条件后加入到执行队列的下一行代码可以理解为线程池直接执行了这个call请求,不满足条件则加入等待队列等待调度。
还记得这个call请求吗,它是AsyncCalld对象,最终会调用上面分析过的实现了NamedRunnable的executed()真正地发送请求。
Dispatcher类
通过上面对同步异步发送请求的分析,可以感觉到实现这两者区别的重点是由Dispatcher来实现的,Dispatcher的代码不多,主要是这样的流程:
我们来分析代码。
变量
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Dspatcher中有这些成员,maxRequests是正在执行的异步请求个数的最大值,maxRequestsPerHost是每个Host正在请求的请求个数最大值。executorService是执行请求的线程池,之后三个队列前两个用于异步请求的准备队列和执行队列,最后一个是同步请求的执行队列。
enqueue()
先从我们熟悉的开始。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
enqueue()在上面我们已经简单的分析过一次了,是从realCalld的enqueue()调用的。这里的判断条件就涉及到maxRequests和maxRequestsPerHost了。
runningAsyncCalls.size() < maxRequests
,控制了执行队列中请求个数。
runningCallsForHost(call) < maxRequestsPerHost
控制Host的值,来看runningCallsForHost()代码
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;
}
return result;
}
遍历了执行队列中所有的call,计算出除了webSocket以外所有与传入参数call同一个Host的call的个数,和我们上面分析的一致。
executorService().execute(call);
满足条件加入队列后的执行操作。来看executorService()
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
单例模式返回了一个ThreadPoolExecutor对象,是一个线程池。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
)
参数非常多:
- corePoolSize: 核心线程数,默认情况下核心线程会一直存活。
- maximumPoolSize: 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。
- keepAliveTime: 非核心线程的闲置超时时间,超过这个时间就会被回收。
- unit: keepAliveTime的单位。
- workQueue: 线程池中的任务队列。
- threadFactory: 线程工厂,提供创建新线程的功能。
corePoolSize设置为0表示一旦有闲置的线程就可以回收。容纳最大线程数设置的非常大,但是由于受到maxRequests的影响,并不会创建特别多的线程。60秒的闲置时间。
finished()
三个重载,前面两个都是调用第三个private,记不记得前面两个都是在什么地方调用的?看参数和注释应该也能记起来,第一个是异步最后一步调用,第二个是在同步最后一步调用。它们调用第三个finished()区别就在于第三个参数异步为true,同步为false。
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
来分析三个参数的finished(),同步代码块从队列中移除了此时已经完成的这个call,接着判断如果第三个参数为true就执行promoteCalls(),来看promoteCalls()。
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
promoteCalls()起到一个调控的作用,来控制readyAsyncCalls中的call进入runningAsyncCalls的。首先如果runningAsyncCalls满了或者readyAsyncCalls中没有call了就不做任何操作直接返回。之后遍历readyAsyncCalls中的call,如果call的host满足maxRequestsPerHost限制,就将其从maxRequestsPerHost中移除,加入runningAsyncCalls并立即执行,继续循环遍历直到将runningAsyncCalls加满。异步比同步会多一个调控的步骤promoteCalls()。
回到finished()代码中来,下一步是计算runningCallsCount,是同步异步正在请求的call的总数。
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
之后将成员遍历idleCallback赋值给局部变量,idleCallback是空闲回调,当正在请求总数为0并且idleCallback不为空就调用其run()。
至此,Dispatcher类中的大部分核心就都涉及到了,还会有小部分代码在后面的分析中再提。
总结
最后来总结一下以上分析到的大致流程。
- 首先使用Builder模式创建并初始化了OkHttpClient和Request对象。
- 传入request调用client的newCall()创建了一个RealCall对象,实际操作都是由它完成的。
- 如果是同步请求则调用call的executed():
- 判断此call对象是否执行过,未执行过再继续。
- 调用dispatcher的executed(),将call加入同步执行队列。
- 调用getResponseWithInterceptorChain()初始化拦截器集合并生成拦截器链。
- 调用拦截器链的proceed()依次处理request并依次处理response最终返回。
- 调用dispatcher的finished()将call移除队列。
- 如果是异步请求则调用call的enqueue():
- 判断此call对象是否执行过,未执行过再继续。
- 将传入的callBack封装进AsyncCall对象,它实际为一个Runnable,实现了run方法的一部分。
- 调用dispatcher的enqueue并将asyncCall对象传入。
- 判断asyncCall加入执行队列还是等待队列。
- 如果满足条件加入了等待序列则从线程池中分配线程立即执行asyncCall。
- 调用asyncCall的executed()。
- 调用getResponseWithInterceptorChain()初始化拦截器集合并生成拦截器链。
- 调用拦截器链的proceed()依次处理request并依次处理response最终返回。
- 根据结果,调用asyncCall中callBack的回调方法。
- 调用dispatcher的finished()将此call移除running队列,并对队列重新调整分配。
- 对响应结果进行处理。