核心
Dispatcher
负责调度任务。
异步请求有两个队列,就绪和运行队列。
线程池执行执行队列的任务,队列满了就切换到就绪队列。
当执行队列里面有任务执行完后,会把任务从运行任务中删掉。然后把就绪队列里高优先级的任务放到执行队列中。(无论成功和失败都会执行finish方法进行队列调整)
缓存策略
Http的缓存策略目的用来让客户端下次请求网络数据时节省更多的时间,更快的展示数据。
Cache的internalCache实现
final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}
@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}
@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
写入缓存
@Nullable CacheRequest put(Response response) {
String requestMethod = response.request().method();
// 检验有效性
if (HttpMethod.invalidatesCache(response.request().method())) {
...
}
// 只缓存get请求
if (!requestMethod.equals("GET")) {
return null;
}
...
// 写入缓存包装成的对象
Entry entry = new Entry(response);
// 缓存算法
// 内部维护清理的线程池,通过线程池来实现缓存文件的清理和管理
DiskLruCache.Editor editor = null;
try {
// 将网络请求url转化为对应的key
// 对url进行md5加密处理,获取到16进制表示形式
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
// 写入缓存,写入url,请求头部,响应头部
entry.writeTo(editor);
// 缓存响应body数据并返回
return new CacheRequestImpl(editor);
} catch (IOException e) {
abortQuietly(editor);
return null;
}
}
1.只缓存get请求
2.将缓存信息封装进Entry对象
3.通过DiskLruCache算法写入缓存
读取缓存
@Nullable Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
// 得到缓存
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
return null;
}
try {
// 通过缓存创建出Entry对象
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
// 通过缓存对象获取响应对象
Response response = entry.response(snapshot);
// 请求和响应有没有对应出现
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}
return response;
}
OkHttp拦截器
getResponseWithInterceptorChain()获取Response
将请求一层一层向下传,直到有一层能够得到Resposne就停止向下传递,然后将response向前面的拦截器传递,然后各个拦截器会对respone进行一些处理,最后会传到RealCall类中通过execute来得到response。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...
calls++;
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
}
retryAndFollowUpInterceptor拦截器
负责失败重试以及重定向的拦截器。
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// 建立Http请求所需要的所有网络组件
// 这个在后面的ConnectInterceptor拦截器中才会使用
// 1.用于服务端连接的Connection 2.用于服务端进行数据传输的输入输出流
// 通过拦截器链一层一层传下去
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
...
while (true) {
...
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
//检测该RouteException是否可以重连
// 1.OkHttpClient是否支持失败重连的机制,2.通过isRecoverable方法检测该异常是否是致命的,3.是否有更多的路线,可以重试
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
//判断IOException是否可以重连
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
...
// 请求是成功的,判断状态码
Request followUp = followUpRequest(response, streamAllocation.route());
...
// 对重新尝试次数进行限制,默认20次
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
...
}
}
1.创建StreamAllocation对象,会通过拦截器链往下传到ConnectInterceptor拦截器使用。这个对象是建立Http请求所需要的所有网络组件。
2.调用RealInterceptorChain.proceed(...)进行网络请求。
3.根据异常结果或者响应结果判断是否要进行重新请求。
BridgeInterceptor拦截器
负责将用户构建的一个Request请求转化为能够进行网络访问的请求,将服务请求回来的响应Response转化为用户可用的Response。
@Override public Response intercept(Chain chain) throws IOException {
// 对Request添加很多头部信息,使之成为发送请求的Request
...
// Http 1.1
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
...
// 向服务器发送请求,服务器收到请求返回Response给客户端
Response networkResponse = chain.proceed(requestBuilder.build());
// 将服务器返回回来的response,转换成用户可以使用的response
// 如果支持压缩,从服务端返回回来的数据是压缩数据,在这个拦截器需要对数据进行解压
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
// 客户端是否支持gzip压缩,从服务端返回的响应体内容是否经过gzip压缩,是否有body体
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
// 将response的body体输入流,转换成GzipSource类型,即是解压的数据流
// 让用户以解压的方式读取流数据
GzipSource responseBody = new GzipSource(networkResponse.body().source());
...
}
return responseBuilder.build();
}
CacheInterceptor拦截器
负责读取缓存直接返回、更新缓存的拦截器。
@Override public Response intercept(Chain chain) throws IOException {
// 根据request来判断cache中是否有缓存的response,如果有,得到这个response,然后进行判断当前response是否有效,没有将cacheCandate赋值为空。
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//生成一个缓存策略,用来判断是否使用网络,或者本地缓存,或者都用
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
// 记录次数,请求次数,命中响应次数
cache.trackResponse(strategy);
}
// 缓存不符合要求,将其关掉
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
//如果禁止使用网络并且本地缓存为空,则返回失败。
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 如果不使用网络请求,则返回本地缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 网络请求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 从缓存中读取数据,code是304,有缓存更新缓存
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
// 更新缓存
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//如果本地缓存为空,则通过网络返回的响应构建一个response返回
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// 缓存未经缓存过的response
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
// request是否是无效的缓存方法,无效进行移除
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
- 查看是否有本地缓存,注意本地缓存只缓存GET请求
- 查看网络策略,是使用网络缓存还是本地缓存。不使用网络缓存,直接返回本地缓存。如果都不用返回504失败。
- 上面策略两个都有的话,从缓存中读取Response,然后进行缓存更新。
- 无缓存,通过网络返回的响应构建一个response返回,缓存未经缓存过的response。
ConnectInterceptor拦截器
负责和服务器建立连接的拦截器。
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
// retryAndFollowUpInterceptor初始化的StreamAllocation对象,在这里用的
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// HttpCodec用来编码request,解码response
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// 获取一个连接,用来实际的I/O传输的
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
调用newStream()
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
...
try {
// 1.尝试去获取Connection
// 2.能复用就去复用,不能复用就从连接池中获取可用的连接
// 3.如果从连接池中没有找到连接,则new一个新的连接,把获取到的新的RealConnection放置到新的连接池当中
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
循环调用findHealthyConnection()里面的findConnection()
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
RealConnection result = null;
...
synchronized (connectionPool) {
...
releasedConnection = this.connection;
// 可复用的连接不为空
if (this.connection != null) {
result = this.connection;
releasedConnection = null;
}
if (result == null) {
// 从connectionPool中获取可用的连接,获取成功就返回
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
//如果从连接池中没有找到连接,则new一个新的连接
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
...
// 进行实际的网络连接
result.connect(
connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// 把获取成功的连接放进连接池当中
Internal.instance.put(connectionPool, result);
...
}
...
return result;
}
CallServerInterceptor拦截器
负责向服务器发送请求数据、从服务器读取响应数据的拦截器。
将HTTP请求写到I/O流当中,从I/O流中读取从客户端返回给服务端的数据。
@Override public Response intercept(Chain chain) throws IOException {
...
// 向socket写入请求头信息
httpCodec.writeRequestHeaders(request);
...
// 检查请求方法,用Httpcodec处理request
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if (responseBuilder == null) {
...
// 向socket写入请求body信息
request.body().writeTo(bufferedRequestBody);
}
}
// 完成写入工作
httpCodec.finishRequest();
if (responseBuilder == null) {
// 读取网络响应的头部信息
responseBuilder = httpCodec.readResponseHeaders(false);
}
// 进行网络请求得到response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
// 读取响应的body信息
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
...
return response;
}
连接池
负责复用连接和销毁无用连接。
客户端和服务端连接抽象成一个Connection,实现类RealConnection,而每个Connection都会放到ConnectionPool这个连接池中,使用Deque进行保存。
时间范围内复用Connection,就是浏览器和服务端之间保持长连接,这个连接是可以复用的。在timeout时间内复用connection,并且有效的对其进行回收清理操作。默认最多存在5个空闲连接,存活5分钟。
获取连接
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
// 判断当前获取的连接是否能被使用
if (connection.isEligible(address, route)) {
// 获取可用的连接
// acquire对RealConnection引用的streamAllocation进行计数List<Reference<StreamAllocation>>
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
每一个RealConnection中都有一个StreamAllocation弱引用集合,用于记录对于StreamAllocation的引用。
StreamAllocation里面封装HttpCodec对象,HttpCodec里面封装有Request和Response读写Socket的抽象,每一个请求Request通过Http来请求数据时都需要通过StreamAllocation来获取HttpCodec,从而读取响应结果,而每一个StreamAllocation都是和一个RealConnection绑定的,StreamAllocation可以说是RealConnection、HttpCodec和请求之间的桥梁。
添加连接
在新的connection放进列表之前执行清理闲置连接的线程。
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
如何对connection对象自动回收
通过RealConnection的StreamAllocation的引用计数是否为0来实现自动回收连接的。
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
// cleanup进行清理,返回下次需要清理的间隔时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
cleanup方法-清除任务方法
long cleanup(long now) {
//记录活跃的连接数
int inUseConnectionCount = 0;
//记录空闲的连接数
int idleConnectionCount = 0;
//空闲时间最长的连接
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//判断连接是否在使用,也就是通过StreamAllocation的引用计数来判断
//返回值大于0说明正在被使用
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//找出了空闲时间最长的连接,准备移除
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//如果空闲时间最长的连接的空闲时间超过了5分钟
//或是空闲的连接数超过了限制,就移除
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//如果存在空闲连接但是还没有超过5分钟
//就返回剩下的时间,便于下次进行清理
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//如果没有空闲的连接,那就等5分钟后再尝试清理
return keepAliveDurationNs;
} else {
// 当前没有任何连接,就返回-1,跳出循环
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
如何找到最不活跃的连接
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// 对象计数器,存放在RealConnection连接对象中用于记录Connection的活跃情况。
// StreamAllocation在列表中的数量就是物理socket被引用的次数
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//如果存在引用,就说明是活跃连接,就继续看下一个StreamAllocation
if (reference.get() != null) {
i++;
continue;
}
...
//为空,就把连接从集合中删除
references.remove(i);
connection.noNewStreams = true;
//如果列表为空,就说明此连接上没有StreamAllocation引用了,就返回0,表示是空闲的连接
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
总结:
每次网络请求都会产生一个StreamAllocation对象,通过每个连接的引用计数对象StreamAllocation的计数来回收空闲的连接,向连接池添加新的连接时会触发执行清理空闲连接的任务。
清理空闲连接的任务通过线程池来执行,找到不活跃的连接进行统计,如果空闲时长超过指定最长时长或者空闲数量大于最大指定数就进行清理。