如何发起同步和异步请求
OkHttp 的请求实际上由 RealCall 对象发起,RealCall 提供了 execute 函数发起同步请求和 enqueue 函数发起异步请求
同步请求的执行
- 首先会判断当前这个 RealCall 对象是否已经在执行,如果是的话会抛出异常
- 交给 Dispatch 做统计
- 执行 OkHttp 的各个责任链发起真正的请求功能
- 等待责任链返回结果
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
发起异步请求
OkHttp 的异步请求包装成了 AsyncCall 对象,它是一个线程对象,作为内部类声明在 RealCall 中,由 Dispatch 类统计和执行
- 判断当前任务是否执行,如果已经在执行抛出一个异常
- 构建一个 AsyncCall 对象交给 Dispatch 类管理
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
Dispatcher 类中针对异步任务维护了两个队列,一个 readyAsyncCalls (等待执行队列),一个正在执行的任务队列 runningAsyncCalls
- 将该任务交给等待执行队列
- 调用 promoteAndExecute 执行请求
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
}
promoteAndExecute();
}
promoteAndExecute
- 准备一个可执行的队列
- 轮训所有的等待执行队列,取出任务
- 当前正在执行的队列是否达到 64 的上限
- 是否同时发起了 5 个请求
- 如果满足要求,将消息加入可执行队列,和正在执行队列
- 从可执行队列中取出消息,通过线程池执行
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
消息的执行是由 RealCall 中通过线程池发起的
- 发起任务执行
- 当 finish 时移除任务
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}