第一篇入口
本文基于上一篇的前提,着重介绍其中的默认拦截器的意义
这里稍微回顾一下拦截器的调用顺序
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//尝试建立一个拦截器列表,之后会进行一次链式调用
//简单说就是从上到下执行获取response之前的过程,
//然后获取到response之后,再从下到上执行
List<Interceptor> interceptors = new ArrayList<>();
//首先执行自定义的拦截器
interceptors.addAll(client.interceptors());
//处理重试逻辑
interceptors.add(retryAndFollowUpInterceptor);
//添加一些预定义头部之类的数据
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//处理http协议的缓存逻辑
interceptors.add(new CacheInterceptor(client.internalCache()));
//处理socket建立连接或者复用连接过程,总之这里会建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//在已经建立的链接上进行参数发送和获取响应封装等操作
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
接下来会按照拦截器的执行顺序进行介绍,稍微留意一下所有拦截器都是在子线程中执行,使用中不应该出现线程错误的问题
自定义拦截器
在子线程中,请求先执行的是OkHttpClient中的拦截器列表,这个实际上是自定义拦截器
public Builder addInterceptor(Interceptor interceptor) {
interceptors.add(interceptor);
return this;
}
实际上OkHttpClient通过Builder模式构建,然后可以在Builder中通过上述方法添加自定义拦截器,然后自定义拦截器会按照添加的顺序来执行。
作为优先执行的拦截器,实际上主要有两个好处,第一就是可以监听请求的开始,其次就是可以监听请求的结束,那么平时这里至少可以有两种常用的拦截器
1.打印日志的拦截器:用于在请求开始前打印请求参数等数据,请求结束时打印请求结果数据等
2.追踪请求的拦截器:这个一般可以用于测量一个请求从发起到接收的时间
当然上面只是我想到的两种,实际使用中根据自身需求添加就是了。
RetryAndFollowUpInterceptor
顾名思义,重试和追随拦截器,其实就是用于在请求完成之后,如果当前请求失败但是允许重试,那么会重新发起请求。如果当前请求成功,但是要求重定向,那么请求重定向地址的请求。
当然上面的描述比较粗浅,接下来看一下细节
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
//流的分配者,这里重点是通过当前请求连接构建Address和RouteSelector
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;//重定向的次数,最大20
Response priorResponse = null;//用于记录重定向之前上一次的响应成果,这个响应是会清空响应体的
while (true) {//用于重连或者重定向
if (canceled) {//每一次请求之前先判断当前Call(Request)是否被要求取消
streamAllocation.release();//清理连接池的资源,并且关闭Socket
throw new IOException("Canceled");//直接在此处抛出IOException即可,会在AsyncCall中回调onFailure
}
Response response = null;
//用于标记每一次请求是否应该释放套接字连接
boolean releaseConnection = true;
try {
//进行请求
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//正常来说此时服务端已经返回结果,单次请求完成
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
// 尝试连接到指定的节点时候失败
// 1.在通过host去dns查找ip地址的时候出现异常
// 2.进行socket连接过程中出现异常,实际上socket在连接的时候会尝试一直进行连接
// 除非retryOnConnectionFailure要求不能重连,或者socket连接过程中出现不可重新连接相关的异常的时候会抛出
// 后续会进行异常回调
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
// 这个异常可能更多的出现在网络传输数据的时候
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {// 当前连接中出现异常,并且不可尝试重新连接
streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
streamAllocation.release();//标记当前流分配者后续不再可用
}
}
// Attach the prior response if it exists. Such responses never have a body.
// 在重试之前记录上一次请求的结果
if (priorResponse != null) {
//这里记录了上一次请求返回的响应,不过上一次请求的正文体在这里被置空
//因为之前的响应的不应该被关心的,应该关心的是当前响应的正文体
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
//判断是否存在代理、重定向和一些异常情况,可能需要尝试发出新的请求
//这里可能要进行新的请求的构建
Request followUp = followUpRequest(response);
if (followUp == null) {//不需要重定向
if (!forWebSocket) {//App连接非WebSocket
//标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
//通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
//否则会待在复用池中,等待复用或者在指定的时间后被清理
streamAllocation.release();
}
return response;//返回成功的响应结果
}
//如果到这里,说明要重定向
//先关闭之前服务端响应的输入流
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {//最大重定向次数20
streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
//判断当前是否可以重用连接,主要是Route的问题,包括ip地址等数据
//如果不能则需要通过新的请求地址来新建连接
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
//通过新的Address构建新的流分配者
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//准备进行下一次重定向的请求
request = followUp;
priorResponse = response;
}
}
代码相对抽象,通过流程相对好阐述这个过程:
1.正常的一次请求成功:
try {
//进行请求
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//正常来说此时服务端已经返回结果,单次请求完成
releaseConnection = false;
}finally {
if (releaseConnection) {//正常请求成功不释放当前连接,主要是为了连接的复用
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
if (followUp == null) {//正常请求不会返回重定向
if (!forWebSocket) {//App连接非WebSocket
//标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
//通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
//否则会待在复用池中,等待复用或者在指定的时间后被清理
//总之就是当前流在后续一段时间内可以复用
streamAllocation.release();
}
return response;//返回成功的响应结果
}
非常简单,主要就是请求成功之后释放连接,让连接可以在连接池中被后续复用。
2.第一次连接失败:
while(true){
try {
//进行请求
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//抛出异常
}catch (IOException e) {
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
//判断当前是否可以重试,如果不能,这里抛出异常会回调onFailed,此时releaseConnection = true,走finally
if (!recover(e, requestSendStarted, request)) throw e;
//可以重试,先不释放连接
releaseConnection = false;
continue;
}finally {
if (releaseConnection) {//当前不可尝试重新连接
streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
streamAllocation.release();//标记当前流分配者后续不再可用
}
}
//连接成功之后,走之前的流程
}
private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);//这里会关闭之前的socket
// 应用层是否允许在连接失败之后重新尝试连接
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again.
// 这个是Http2协议中的情况,这里先不考虑
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// This exception is fatal.
// 检查当前异常类型,因为有的异常是无法再次进行重试连接的
if (!isRecoverable(e, requestSendStarted)) return false;
// 当前没有连接节点可以去尝试
// 一般来说就是当前节点
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// If there was a protocol problem, don't recover.
// 协议异常,比方说协议规定的报文格式不符之类的情况
// 总之就是一些不满足当前协议的条件
if (e instanceof ProtocolException) {
return false;
}
// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
// 这个一般是Okio的异常,会在流操作超时之后抛出
// SocketTimeoutException一般可以认为是socket连接超时或者读写超时
// 这种时候可以尝试重连
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && !requestSendStarted;
}
// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
// 当前是Https握手失败异常
if (e instanceof SSLHandshakeException) {
// 如果是证书异常,这样没有必要重试,因为重试了也会失败
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false;
}
}
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}
// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}
当连接失败之后,会进行重新连接的判断,首先先标记当前流失败
1.当前应用层是否允许重新连接
2.当前异常是否是重连无意义异常,比方说Http证书校验异常,这种就算重连也会失败
3.检查当前是否有节点可用
在判断条件之前,会先标记当前流失败,看一下细节实现,具体注释会对应当前场景
public void streamFailed(IOException e) {
Socket socket;
boolean noNewStreams = false;
synchronized (connectionPool) {
if (e instanceof StreamResetException) {//这个是Http2协议的,先忽略
//...
} else if (connection != null
&& (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {//当前连接不为空,且不是http2协议
noNewStreams = true;//标记之后不会新建连接
// 当前连接没有成功,标记当前连接节点失败
// 当前连接被标记成功的条件是要求完成一次请求,并且从服务端成功读取响应体中的正文部分
if (connection.successCount == 0) {
if (route != null && e != null) {
routeSelector.connectFailed(route, e);
}
route = null;//会导致当前节点失效,那么如果要重新连接,要求必须有下一个备用的节点
}
}
//不新建流、当前流完成、不释放流分配者
//当前socket连接会被关闭
socket = deallocate(noNewStreams, false, true);
}
closeQuietly(socket);
}
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
//连接完成之后,无论成功失败,都清理当前codec
if (streamFinished) {
this.codec = null;
}
//当前流分配者是否继续可用
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {//当前连接是否可以重用
//一旦被标记,会导致连接池移除当前连接
//后续会关闭socket连接
connection.noNewStreams = true;
}
//在一次正常的请求完成之后,要进行连接的释放
//包括socket的关闭
if (this.codec == null && (this.released || connection.noNewStreams)) {
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;
}
private void release(RealConnection connection) {
//这里实际上就是将连接和流分配者的关联解除
//如果允许连接复用,那么不应该关闭socket连接,会在一定时间内等待复用
//否则后面应该主动关闭socket连接
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
因为当前请求没有完全成功,那么是没有办法进行复用的,所以说这里会关闭当前的socket连接并且从复用池中移除。
除此之外,还进行了节点是否有效的判断,因为复用的时候必须要有指定的连接节点,当前连接出现异常的请求会有一个对应的节点,如果当前节点连接成功,那么当前节点是有效的,重连的时候可以连接这个节点,否则就应该标记当前节点无效,重连的时候必须找寻其他节点。
连接复用池
上面既然提到了连接复用池,那么这里就看一下ConnectionPool的实现
首先是为什么需要使用复用池?
在Http1.1的协议中,通过"Connection: Keep-Alive"这个头部报文,可以指定当前socket连接为长连接,这意味者如果服务端/客户端没有主动终止当前连接,那么这个连接会一直保持
(实际中可能会因为很多原因导致长连接中断,这个不是重点)
此外在一次socket连接的过程中,至少会有3次握手这个流程的开销,可能还会有SSL证书/域名校验这一块,那么如果每一次连接都重复上述操作,显然不是什么很好的选择。
那么复用之前已经建立的长连接就是最好的一个选择
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
OkHttp中默认允许最多5个可以复用的连接,连接的最大等待时间是5分钟。
上面在将重试拦截器的时候提到,如果一个连接完成之后会进行连接释放,其中
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
//noNewStreams实际上是标记当前流不可复用
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);//直接从连接池中移除
return true;
} else {
//这里是唤醒cleaup线程
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {//计算下一次唤醒进行清理任务的间隔
long waitMillis = waitNanos / 1000000L;//毫秒
waitNanos -= (waitMillis * 1000000L);//纳秒
synchronized (ConnectionPool.this) {
try {//线程挂起指定时间
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
//遍历当前连接池中的所有连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 如果当前连接正在使用中,不回收,继续查找下一个
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;//空闲连接数+1
// If the connection is ready to be evicted, we're done.
// 当前连接空闲时间 = 当前时间 - 当前连接被释放允许参与复用的时间
long idleDurationNs = now - connection.idleAtNanos;
// 这里是记录当前连接池中所有空闲连接中空闲的最久的连接
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;//当前最大的空闲的连接到现在所经过的纳秒数
longestIdleConnection = connection;//当前空闲最久的可复用的连接
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {//当前连接超过最大的空闲时间或者数量大于最大的空闲连接数量
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
// 从连接池中移除,不再复用
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {//当前存在空闲连接,并且空闲连接没有超时
// A connection will be ready to evict soon.
// 当前清理线程挂起最大的空闲时间,下一次唤醒可能会清理当前最久的空闲连接
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
//每一次清理的对象为当前空闲连接列表中最久的空闲连接
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//往连接池中添加连接
if (!cleanupRunning) {//当前清理线程没有运行
cleanupRunning = true;//执行清理线程
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
private final Deque<RealConnection> connections = new ArrayDeque<>();
连接池中核心就是一个连接队列和一个清理线程,在每一次添加连接到连接队列的时候尝试开启清理线程,清理线程会尝试清理当前在链接队列中空闲最久的连接。
这里只是讲了逻辑,实际上在OkHttp中Http1.1要想复用连接,那么必须读取完ResponseBody中的数据,这样才会释放连接
总结
这一篇主要是了解重连和复用策略,下一篇会着重讲解Http缓存相关