上一篇拦截器分析中,在ConnectInterceptor的intercept方法中,有这样一句代码来获得stream。
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
点streamAllocation进去之后看到newStream方法中调用了findHealthyConnection方法,实现如下:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
// 这一句代码是实际获得连接的
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
如果是新的连接就跳过健康检查,如果不是就查一下是否已经断开啦,输入输出是否关闭啦。再进去看findConnection方法
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
// 在pool中取连接的话,就需要拿这个pool做同步锁,如果是第一次发起请求应该是拿不到的,会走到下面
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
// 如果allocatedConnection 不为空并且连接池还没满,就直接使用这个连接
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
// 调用了okhttpclient的get方法,从connectionPool中根据address拿到连接。
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
// 第一次进来就是空
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
// 这里面先是查找内存缓存,根据proxies的类型在routeSelector的集合inetSocketAddresses中查找,没有的话就重设一个
// 调用address.dns().lookup(socketHost)方法,通过DNS服务器查询返回一组ip地址(一个域名可能对应多个ip地址,可用于自动重连)
// 最后将得到的address 加入集合inetSocketAddresses中缓存起来。
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
// 再进行一次尝试,从连接池中拿连接
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) {
route = selectedRoute;
return connection;
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 新搞一个连接
result = new RealConnection(connectionPool, selectedRoute);
// 将connection的引用交给streamAllocation,将streamAllocation的弱引用加入到connection的allocations集合中
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 这里调用RealConnection的connect方法,传入ip 端口号进行connect,RealConnection内部的source和sink就是在这个方法中赋值的。
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
// connectionPool中维护了一个键值对,里面存了所有连接失败的route,每个连接失败的route都加入进去。
// 而这句话是把连接成功的从route黑名单中去除掉。
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
// 用okhttpclient的代码把当前的连接放入连接池中,这种麻烦的写法估计是跟设计模式有关系
// 注意放进去的同时会触发清理
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 处理多线程产生的问题,如果产生了多个connection就release掉当前的,用另一个线程创建的connection
// 并且关闭掉多余的socket
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
代码很长,我看到的是:
RouteSelector做准备;
ConnectionPool管理连接;
RealConnection做具体执行;
以StreamAllocation为中心协调各个类;
将RealConnection生成的Http1Codec和Http2Codec这种面向协议(设置请求头读取回复)进行sink和read的类 传递到拦截器中。
代码执行步骤大致分三部分:
- 获得route、ip、port那些鬼,由RouteSelector这个类完成,拿到address之后再尝试连接池中拿connection。
- 实在从连接池中拿不到了,就新建connection,用raw socket进行三握手那些鬼,由RealConnection这个类完成,拿到source和sink;
让connection和streamAllocation相互引用(一个强引用一个弱引用),连接池里面有一个ArrayDeque来记录所有的socket连接。将新的connection放入连接池,触发清理;
将route从黑名单移除。 - 检查是否有多线程导致的问题,如果有,就释放当前连接,用别的线程创建的连接。
上面说将连接加入连接池时会触发清理操作,下面贴上代码详细说明是如何清理的。
在connectionPool中,有个cleanup方法来执行清理操作
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 这里具体执行streamAllocation的清理,具体代码在下面
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 如果说:闲置的时间超过了设定值,或者最大限制连接数超过设定值,就把connection从连接池中移除,并关掉connection。
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
// 根据闲置时间和闲置连接数,还有可能立刻执行下一次清理
return 0;
}
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
//遍历每一个connection的streamAllocation弱引用集合,发现弱引用已被回收,就将其在弱引用集合中移除
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
// 打印警告,告知程序员,你的使用有问题
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
// 如果所有的弱引用都被移除掉了,说明这个connection是闲置的,记录闲置的时间。将闲置最久的connection记录下来。
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
总结一下,清理策略就是:
- connection自身记录streamAllocation的连接数,达到0的时候就标记自己为闲置连接,记录闲置时间等待清理
- 满足闲置时间太长或者闲置连接太多时,ConnectionPool就执行清理操作关掉连接(默认空闲的socket最大连接数为5个,socket的keepAlive时间为5秒)。
- 正常情况下ConnectionPool会每隔一段时间就尝试清理一次。看连接使用情况,忙的话就一直尝试清理,闲的时候加入任务也会触发清理。
okhttp对socket的直接管理还是通过ConnectionPool来实现的。
回顾一下前面的拦截器的知识,结合一下:
- 在RealInterceptorChain中有一个streamAllocation成员变量
- 在RetryAndFollowUpInterceptor中初始化streamAllocation传到RealInterceptorChain中,此时还是没有任何连接和这个streamAllocation绑定的
- 到了ConnectInterceptor中,调用streamAllocation的newStream方法,内部调用findConnection方法,获得连接
- 连接的获得是先尝试从连接池中取,取不到就初始化一个连接,将streamAllocation弱引用给connection(此时connection可能已经有很多streamAllocation在用了),同时在连接池中尝试清理。
- 拿到连接之后,返回给ConnectInterceptor一个HttpCodec,这是一个接口的实现类,根据http协议是 1.x 还是2 内部有不同的实现
- 回到CallServerInterceptor中,拿HttpCodec来执行写入请求头、读取返回信息、构造responseBody等。