StreamAllocation主要功能:
- 屏蔽协议细节处理不同Connection的复用功能,主要是协议流的可复用性和ConnectionPool的处理;
- 屏蔽协议(这些协议主要指HTTP1和HTTP2)细节给请求提供HttpCodec用于处理输入输出;
- StreamAllocation关联了一个Call请求的所有周期,StreamAllocation可以用来判断一个Call周期内Connection是否还在被占用,其它的Call能不能使用该Connection;
- StreamAllocation、Connection和ConnectionPool一起还提供了简单的StreamAllocation泄露检测功能。
StreamAllocation生命周期
StreamAllocation的创建
每次调用RealCall#execute时会新建一组interceptor用于处理请求,其中有一个是用于处理重试和重定向的拦截器RetryAndFollowUpInterceptor,RetryAndFollowUpInterceptor是在Recall的构造函数中创建。
// RetryAndFollowUpInterceptor
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
RetryAndFollowUpInterceptor#intercept会新建一个StreamAllocation对象,可以看到一个Realcall#execute(在OkHttp中一个Realcall只能执行一次)会新建一个StreamAllocation对象。有两个地方会存这个对象的引用:
- 以成员变量streamAllocation保存在RetryAndFollowUpInterceptor中;
- 请求的执行过程中在ConnectInterceptor#intercept这个方法中会调用streamAllocation#newStream,这个方法新建或者在ConnectionPool中找到一个可用的Connection,然后将streamAllocation以弱引用的形式保存在 Connection#allocations中。
StreamAllocation的释放
请求相应完毕,用户读取数据,就会释放StreamAllocation,主要是解除StreamAllocation和其对应的Connection的关系,真正解除StreamAllocation与其对应Connection关系的地方StreamAllocation#release(RealConnection)这个私有方法中。主要功能是将StreamAllocation对应的弱引用从Connection#allocations移除。
// StreamAllocation#release
private void release(RealConnection connection) {
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
为什么移除了就能解除关系呢?
StreamAllocation从ConnectionPool中获取连接池中的Connection的第一个条件就是allocations.size() >= allocationLimit而对于Http1来说一个Connection#allocationLimit = 1,只有allocations.size() = 0,该连接才符合其它请求使用的第一个条件。所以,当请求结束时将Reference<StreamAllocation>从allocations中移除之后表示该请求能被复用了。
// RealConnection#isEligible
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
}
什么情况下会调用StreamAllocation#release(RealConnection)这个私有方法呢?
StreamAllocation#release(RealConnection)只在StreamAllocation#deallocate这个方法中被调用。
// StreamAllocation#deallocate
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
if (streamFinished) {
this.codec = null;
}
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true;
}
if (this.codec == null && (this.released || connection.noNewStreams)) {
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;
}
- 第一个参数
streamFinished
表示当前的请求的响应流数据读取完毕或者被关闭; - 第二个参数
released
表示当前的请求处理完毕或出现异常,可以释放AllocationStream和其对应的Connection; - 第三个参数
noNewStreams
表示当前的连接I/O出现异常,需要强制关闭连接底层的socket。
需要同时满足以下两个条件才发触发StreamAllocation#release(RealConnection)
- 第一个参数
streamFinished=true
或者StreamAllocation#codec=null
- 第二个参数
released=true
或者第三个参数noNewStreams= true
streamFinished=true
的情况:
- 对于http1正常关闭流或者流里面的数据读取完毕会调用StreamAllocation#streamFinished此时触发StreamAllocation#deallocate(false, false, true);
- 对于http1读取流里的数据出现异常会调用StreamAllocation#streamFinished此时触发StreamAllocation#deallocate(true, false, true);
- 对于http2会始终调用StreamAllocation#deallocate(false, false, true);
- RetryAndFollowUpInterceptor中会调用StreamAllocation#streamFailed最终会调用StreamAllocation#deallocate(noNewStreams, false, true) (异常可以先不考虑);
上述几种情况传入的released
的值均为false,是否强制关闭连接并解除StreamAllocation与connection的关系取决于noNewStreams的值。
StreamAllocation#codec=null
的情况
有一种情况会出现这种了(有body,body中的数据被读取完毕也会出现下面的这种情况了吧)。。。
某个请求服务端的http响应没有body,此时构建ResponseBody的时候直接新建一个FixedLengthSource
且FixedLengthSource#bytesRemaining = 0,此时本来就没有数据可读,不需要用户调用response#close了,构造方法中会直接Http1Codec#AbstractSource#endOfInput(此时触发逻辑和streamFinished的逻辑一样)触发StreamAllocation#deallocate(false, false, true);很明显此时deallocate不会回收connection,仅仅
将StreamAllocation#codec标记为null。
// FixedLengthSource的构造函数,
FixedLengthSource(long length) throws IOException {
bytesRemaining = length;
if (bytesRemaining == 0) {
endOfInput(true, null);
}
}
这个null什么时候起作用呢? 调用StreamAllocation#release(这个release是个public且无参的方法)。StreamAllocation#release直接触发deallocate(false, true, false),即此时第二个变量released = true
。streamAllocation#release经典用法之一是结合上面的StreamAllocation#codec = null
,这种情况出现在RetryAndFollowUpInterceptor#intercept中,此次请求正常结束且服务端没有返回重定向此时结合上面的调用接口没有返回值即StreamAllocation#codec = null
就可以正常回收连接。
RetryAndFollowUpInterceptor#intercept
public Response intercept(Chain chain) throws IOException {
...
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
...
}
StreamAllocation#release其它调用的地方都是在RetryAndFollowUpInterceptor中出现的都是出现异常的一些逻辑StreamAllocation#release会触发StreamAllocation#deallocate(false, true, false);可以看到除了released = true
之外,其它两个变量都为false,可以看做是一种尝试吧,此时只有配合StreamAllocation#codec = null
才能真正释放StreamAllocation和其对应的Connection之间的关系了。
StreamAllocation#deallocate最后一种触发的逻辑是在StreamAllocation#releaseIfNoNewStreams,原理类似。一个场景是一个HTTP1请求出现服务端返回重定向,重复利用StreamAllocation的Connection,重新利用肯定要检查一下Connection的状态,如果不能建立新的流,强制回收该connection。触发逻辑是在下面的方法中:
StreamAllocation#findConnection
private RealConnection findConnection(){
.....
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
// 在哪种情况下会出现`this.connection != null`呢?用户的第一次请求发送个服务端并成功收
// 到服务端的相应,但是发现是重定向需要重新发送请求,这时发送的请求就会复用RetryAndFollowUpInterceptor中
// 已有的StreamAllocation。出现这种情况this.connection就不会为null
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
...
}