大工程搞完了,,咱们接着来抠细节,聊一聊OkHttp的连接池管理和任务队列管理
连接池
OkHttp的链接迟相关的类是
- ConnectionPool
- StreamAllocation
如果这边眼生的朋友请看之前的文章; StreamAllocation里面有个ConnectionPool的引用,SteamAllocation是协调connection,strams,calls 三者之间的关系的,我们按照之前的顺序来看StreamAllocation具体和ConnectionPool之间有着什么不可描述的事情。
SteamAllocation在ConnectionInterceptor里面的调用的方法如下:
//获取HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//开始连接
RealConnection connection = streamAllocation.connection();
然后对比到SteamAllocation里面 ,他调用 findHealthyConnection —>findConnection—>Internal.instance.get(connectionPool, address, this, null);
Internal 是在okHttpClient里面的静态域里面初始化的,他的get的具体就是调用connectionPool的get方法,我们直接找ConnectionPool.get(address, streamAllocation, route),这个方法是找在池里面背回来的链接,如果没有的话返回null,再到下面进行初始化,直接new RealConnection(connectionPool, selectedRoute);创建了一个连接,然后添加计数,添加计数就是在RealConnection里面的
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
新增一个WeakReference <StreamAllocation>
上述系列操作完之后就是一个RealConnection诞生了, 然连接上Socket,把当前的链接放到ConnectionPool里,调用的也是Internal.instance.put—>ConnectionPool.put>,把RealConnection传递过去,他的put操作:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//如果清除空闲链接的线程没启动的话启动清除空闲链接
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
//添加链接到connections
connections.add(connection);
}
他的清理线程的工作就是一个while(true)的循环:
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);// waitMillis = 300 000
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
可以看出是不是要等待全靠cleanup (为啥我想到了4396.。。)
在此池上执行维护,如果超过了保持活动限制或空闲连接限制,则会逐渐消除空闲时间最长的连接 返回以纳秒为单位的睡眠持续时间,直到下一次调用此方法。 如果不需要进一步的清理,则返回-1。
可以看到,okhttp的辣鸡回收就在这个方法里,方法就在ConnectionPool里有兴趣的同学可以去看一下,这里我直接说个比较关键的地方:
他会判断链接是否在使用中,判断的依据就是WeakReference是否为空(mmp,没想到吧)如果为空的话就remove掉,把RealConnection的noNewStreams设置为true,这种方式依赖虚拟机的GC
整个流程如图所示
任务队列
说完连接池,,我们接着说他的任务队列,这个任务队列存在的场景是在调用enqueue里面的,相关的类是:
Dispatcher
没图我说个java8.。。
然后再根据图看下面的流程
每个 OkHttpClient
只有一个任务队列,是在OkHttpClient里面初始化的,在RealCall里面代用的时候会把这个AsyncCall添加到Dispatcher里面
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
我们跑进去看看enqueue的源码
public final class Dispatcher {
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
}
看这个方法 判断的条件是:
- 当前运行的集合大小小宇最大请求数量,
- 当前运行的这个Host请求的数量小于最大Host的请求量
这边都是利用Deque来实现的,之前的文章有讲到过:
deque 即双端队列。是一种具有队列和栈的性质的数据结构。双端队列中的元素可以从两端弹出,其限定插入和删除操作在表的两端进行。
如果符合条件,进去添加到runningAsyncCalls里面,去执行,如果不符合条件就放到readyAsyncCalls里面
在RealCall里面的enqueue里当前的执行完之后,在finally里面就调用finished方法 ,会去readyAsyncCalls里面寻找下一个要执行的AsnyCall
这里的executorService()是一个线程池,可能关于线程池的一些东西大家不是特别清楚 这里稍微解释一下,首先是他的构造函数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
上面的SynchronousQueue可能一般同学看的不是特别熟悉这里解释一下:
SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
SynchronousQueue的一个使用场景的典型就是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。执行是调用execute方法。
所以
整个dispatcher类负责分发了所有的请求,完成了所有请求的调度,避免了拥塞
现在上面的两个流程已经分析完。 我们来画张图,把上面的流程串起来,整个也是OkHttp的请求调度的核心流程