简介
OkHttp是当下Android使用最频繁的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp,同时现在流行的Retrofit框架底层同样是使用OKHttp的。
优点:
- 支持Spdy、Http1.X、Http2、Quic以及WebSocket
- 连接池复用底层TCP(Socket),减少请求延时
- 无缝的支持GZIP减少数据流量
- 缓存响应数据减少重复的网络请求
- 请求失败自动重试主机的其他ip,自动重定向
…….
tip:本文版本为OkHttp 3.10.0,最新版本为:4.0.1,逻辑与3版本并没有太大变化,但是改为kotlin实现。
OkHttpClient client = new OkHttpClient();
void syncGet(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
// 执行同步请求
Call call = client.newCall(request);
Response response = call.execute();
// 获得响应
ResponseBody body = response.body();
System.out.println(body.string());
}
void AsyncGet(String url) {
Request request = new Request.Builder()
.url(url)
.build();
// 执行异步请求
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {}
@Override
public void onResponse(Call call, Response response) throws IOException {
// 获得响应
ResponseBody body = response.body();
System.out.println(body.string());
}
});
}
使用起来很简单,我们用一张流程图描述一下
在使用Okhttp发送一次请求时,对于使用者最少要用到OkHttpClient、Request和Call这3个对象;
OkHttpClient 中是一些配置信息,比如代理的配置,ssl证书配置等等;
Request 是封装请求参数信息,比如请求地址、请求方法、请求头、请求体等等;
Call 本身是一个接口,实现类为RealCall,execute()是负责同步请求,enqueue()是负责异步请求,两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行。
请求任务是如何分配的呢?这就涉及到OkHttp的任务分发器。
Dispatcher 分发器
分发器就是来调配请求任务的,内部会包含一个线程池以及任务队列。在创建OkHttpClient时,我们也可以传递自己定义的线程池来创建分发器。我看一下Dispatcher 的成员属性:
public final class Dispatcher {
// 异步请求最大同时请求数量
private int maxRequests = 64;
// 异步请求同一域名同时存在的最大请求数量
private int maxRequestsPerHost = 5;
// 闲置任务(没有请求时可执行一些任务,由使用者设置)
private @Nullable
Runnable idleCallback;
//异步请求使用的线程池
private @Nullable
ExecutorService executorService;
//异步请求等待执行队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步请求正在执行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步请求正在执行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
// 上面说的可配置线程池的构造函数
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {}
// 创建线程池(懒加载)
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
false));
}
return executorService;
}
}
execute() 同步请求
这是我们来看一下我们使用的时候调用Call 对象的同步方法execute()做了什么(实际上调用的是Call的实现类RealCall的execute方法):
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
// 我们先看这里,调用了dispatcher的executed方法
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
// 任务结束或者出现异常,将任务标记为结束
client.dispatcher().finished(this);
}
}
很明显调用了dispatcher的executed()方法,ok再跟到dispatcher中:
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
就一句话,将call添加到正在运行的同步队列中,表示当前的请求任务已经开始执行了,等到请求完成再把它从队列中移除 (dispatcher中的finished方法)。
enqueue() 异步请求
再看一下RealCall的enqueue方法
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 调用Dispatcher中的enqueue方法
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
同样的调用了Dispatcher中的enqueue方法,看一眼:
synchronized void enqueue(AsyncCall call) {
// 如果同时执行的请求数量不超过64,同时同一域名主机的请求数量不超过5个
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 满足条件将请求加入正在运行的队列中,并且开始执行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
// 不满足条件将请求加入等待队列
readyAsyncCalls.add(call);
}
}
上面的描述已经很详细了,那么等待队列的请求啥时候才能被临幸呢?有没有注意到
Dispatcher的enqueue方法的参数是AsyncCall,其实他就是一个Runnable。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
.....
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 当请求执行完成调用了Dispatcher的finished方法
client.dispatcher().finished(this);
}
}
}
和同步请求一样,当一个异步请求结束也调用了Dispatcher的finished方法,看一眼:
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
调用重载的finished:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
// 先把当前的请求移出正在执行队列
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// promoteCalls这里是true, 执行promoteCalls()
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
先把当前的请求移出正在执行队列,然后执行了promoteCalls(),八成我们想要的就在这里了:
private void promoteCalls() {
// 如果同时执行的请求数量不超过64个,直接return
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
// 如果等待队列是空的,return!
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
// 遍历等待队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 检查一下正在执行的同一host的请求数量是不是不满5个
if (runningCallsForHost(call) < maxRequestsPerHost) {
// 满足条件,移出等待队列,加入正在执行队列,直接执行请求任务!
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
// 正在执行队列满了,return 吧!
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
这里就是我们要找的,当一个异步任务执行完成时,检查正在执行队列有没有空位置,遍历等待队列,把其中满足条件的请求任务拿出来,放到正在执行队列中去执行,执行完了再次检查,如此循环。
Dispatcher中的线程池
Dispatcher中的线程池很有意思,我们来看一下
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 核心线程数0,最大线程数Integer.MAX_VALUE,空闲线程闲置时间 60,
// 闲置时间单位 秒,线程等待队列 SynchronousQueue, 以及线程创建工厂
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
false));
}
return executorService;
}
这是Dispatcher中默认创建的线程池,注意里面几个关键的参数,第一个参数核心线程数量0,最大线程数 Integer.MAX_VALUE 和队列 SynchronousQueue。
为啥要这么配置呢?
首先核心线程为0,表示线程池就不用一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收,在一些网络请求不频繁的app上可以节约一些内存。而最大线程Integer.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量,即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务!来分析一下:
SynchronousQueue 是一种无容量的队列,当向线程池中加入新请求时,只会有以下几种情况:
- 由于没有核心线程,第一次进来的请求会直接创建新线程执行。
- 此时有未被回收的空闲线程,复用该线程执行请求。
- 没有空闲线程了,新请求需要添加到队列,但是队列的容量为0,相当于满了,此时要判断当前正在执行的线程数是否大于最大线程数,由于最大线程数为Integer.MAX_VALUE,肯定不会超过,所以要新建一个线程执行请求。
可以得出结论,我们的请求任务是可以立即执行的,其实还有另外2种阻塞队列:ArrayBlockingQueue 和 LinkedBlockingQueue,但是它们一般都有容量,当请求任务进入等待队列后,可能会出现以下几种情况:
- 没有空闲线程了,新请求添加到队列,队列中的请求一直在等待空闲线程,出现阻塞。
- 没有空闲线程了,等待队列也满了,正在执行的线程数小于最大线程数,新建线程执行请求,出现后提交的任务先执行,而先提交的任务一直在等待的情况,同样阻塞了。
- 没有空闲线程了,等待队列也满了,正在执行的线程数大于等于最大线程数,请求被拒绝了,尴尬。
总结一句话,Dispatcher的默认线程池参数配置保证了新建的请求都可以被立即执行,避免阻塞。
当然需要注意的时,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个数。那么当设置最大线程数为Integer.MAX_VALUE时,OkHttp同时还有最大请求任务执行个数: 64的限制,这样既解决了这个问题同时也能获得最大吞吐。