上一篇的同步流程分析(https://www.jianshu.com/p/343118d0bc19),这次来研究下异步的流程。
在这之前,先补充下,Dispatcher(调度器),是保存同步和异步Call的地方,负责执行异步AsyncCall(就是一个子线程)。
先来看看这段实现异步请求的最简洁代码:
//异步
OkHttpClient asynClient = new OkHttpClient();
Request asynRequest = new Request.Builder().url("http://......").build();
asynClient.newCall(asynRequest).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
这段代码,前奏准备都跟同步的一样,我们来看看关键代码asynClient.newCall(asynRequest).enqueue(callback);
还记得吗,上一篇里面分析过newCall,会创建一个call接口的实现类RealCall对象,enqueue()字面上是加入队列的意思,还是再看看call的定义吧:
public interface Call extends Cloneable {
......
Response execute() throws IOException;
void enqueue(Callback responseCallback);
......
}
不熟悉realCall的可以先去看看okhttp同步流程源码分析(https://www.jianshu.com/p/343118d0bc19),既然realCall是call的实现类,很好,我们去realCall看看enqueue的实现:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
之前,同步流程中,realCall直接调用execute(),在里面通过各种拦截器之后获取到最终数据,然而,在异步流程中,看上面最后一行代码,入队列的是一个AsyncCall,成功,失败回调的接口对象作为AsyncCall的参数,在 研究client.dispatcher().enqueue():之前,我们先看看AsyncCall到底是啥:
//RealCall.java
......
final class AsyncCall extends NamedRunnable {
.....
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
......
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
AsyncCall是RealCall的内部类,并且AsyncCall继承了NamedRunnable类,这个类里面有Runable字眼,execute()也是重写父类的方法,里面的实现跟同步里面获取数据是一致的,都是通过重重拦截器,最终获取到数据,只是处理起来,异步里面通过接口来回调而已。咱再看看NamedRunnable这个类:
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
NamedRunnable竟然是个抽象类,execute()抽象方法在AsyncCall实现,在run()里面调用。我们知道,多线程的实现方式:实现Runnable接口、继承Thread类,从这里,我们知道AsyncCall其实就是一个Runnable的子类,可以理解为AsyncCall就是一个子线程。我们再回头来看client.dispatcher().enqueue(AsyncCall(...)):
//Dispatcher.java
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
......
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
......
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
......
}
上面已经把关键代码贴出,有两个队列,一个是正在运行的子线程队列runningAsyncCalls,一个是待执行的请求队列readyAsyncCalls,
enqueue里面,这个判断: 正在运行的子线程 < 64 &&单个host最大同时执行的子线程 < 5,就把call添加到runningAsyncCalls队列中,并且通过线程池来执行这个call,否则就添加到readyAsyncCalls队列。
可以,这很异步,线程池都出来了,看看这个线程池:
//Dispatcher.java
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
上面我们知道正在执行的AsyncCall会添加到runningAsyncCalls队列,那子线程任务执行完了后总要移除吧,我们回到上面的AsyncCall内部的execute()方法,看最后面那几行,finally里面的client.dispatcher().finished(this),this就是当前的AsyncCall对象:
Dispatcher.java
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
看到了吧,calls.remove(call),这里面移除,这个判断,如果是同步的话,就直接抛出异常,同步流程中,并没有加入队列的操作,只有异步的时候才会执行后续的操作。
看来要好好分析下拦截器了,主要的获取数据的操作全特么在这里面。