涉及到的设计模式
OkHttp同步请求
发送请求后,UI线程之后就会进入阻塞状态,直到请求结束。
1.创建 OkHttpClient 和 Request 对象
2.将 Request 封装成 Call 对象
3.调用 Call 的 execute() 方法执行同步请求
OkHttp异步请求
发送请求后,不阻塞UI线程
1.创建 OkHttpClient 和 Request 对象
2.将 Request 封装成 Call 对象
3.调用 Call 的 enqueue() 方法执行异步请求( CallBack 对象的 onResponse 和 onFailure 都是在子线程中的)
我们先分析请求的第一步和第二步,因为同步和异步的前两步是相同的
第一步
OkHttpClient.Builder的构造方法(建造者设计模式)
- dispatcher 分发器,由它决定异步请求是进行直接处理还是等待。同步请求则是直接处理。
- connectionPool 连接池,客户端和服务器的连接可以理解为一个连接对象,由 connectionPool 进行管理,所以如果当 Url 的请求相同的时候,我们可以选择连接对象进行复用;实现了策略规定的,哪些网络连接可以保持打开状态,哪些网络连接可以复用.
- ...
Request.Builder的构造方法(建造者设计模式)
- method 默认 "Get"
- headers 保存头部信息
- Request.Builder.build() 方法直接创建 Request 对象
- ...
插入知识点
Dispatcher类为重点,后面有很多地方涉及到,所以我们先详细分析。
Dispatcher.java部分代码(队列)
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
- 定义了三个队列,异步就绪队列、异步执行队列、同步执行队列
Dispatcher.java部分代码(线程池)
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- Dispatcher维护了一个单例的线程池用于执行请求
Dispatcher.java部分代码(executed,enqueue)
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
- excuted() 方法将请求加到同步执行队列中(但是什么时候执行呢?继续往下会有进一步了解)
- enqueue() 方法当 异步执行队列 的请求数未达到最大值且请求的host也未达到最大值时,将请求加入 异步执行队列 并开始执行。否则加入 异步就绪队列
Dispatcher.java 部分代码(finished())
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
- 从最开始的两个方法可以看出finished() 方法的参数 promoteCalls 用于区别是同步还是异步
- 在三个参数的finished()中,移除传入队列中的Call对象。(执行完毕的Call,后面会知道是哪里调用的finished方法,了解Call的状态)
- 对于异步队列,则执行 promoteCalls() 方法,调整异步请求的队列;如果异步执行队列中的请求数达到最大值或者异步就绪队列中的数量为0则无操作。
- promoteCalls() 方法中,将异步就绪队列中的任务删除,并将删除的请求添加到异步执行队列中,并开始执行请求。(重点重点重点重点)
- runningCallsCount() 方法计算正在运行的请求数量
- if (runningCallsCount == 0 && idleCallback != null) 如果正在请求的数量为0(表示 dispatch 分发器中没有可运行的请求) 并且 idleCallback 不为空的时候就执行 idleCallback.run() 方法
- idleCallback 当每次 dispathch 的请求数量为0时进行回调
第二步
1. OkHttpClient.newCall()方法
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
- Call是一个接口,所以需要关注与它的实现类 RealCall
2. RealCall.newRealCall方法
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
- 这里实例化了 RealCall 对象,并赋值 listener 对象
3. RealCall的构造方法
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
- 可以看出 RealCall 持有之前初始化完毕的 OkHttpClient 对象和 request 对象
- RetryAndFollowUpInterceptor 重定向和重试拦截器。。下面着重分析
同步请求 call.execute()方法
因为 Call 为接口,我们需要研究的是实现类 RealCall 的 execute() 方法
RealCall.java部分代码
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
- executed 通过标志位避免请求重复执行
- eventListener.callStart(this); 当执行异步或同步请求的时候开启监听
- 重点 client.dispatcher().executed(this); 将 call 加入同步执行队列
- getResponseWithInterceptorChain(); 依次调用定义的拦截器,这里就是开始执行请求的地方(关于拦截器,下面会进行分析)
- client.dispatcher().finished(this); 将请求从同步执行队列移除
异步请求 call.enqueue()方法
因为 Call 为接口,我们需要研究的是实现类 RealCall 的 enqueue() 方法
RealCall.java部分代码
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
- executed 通过标志位避免请求重复执行
- 将 responseCallback(我们传入的回调) 封装成 AsyncCall
- AsynCall 继承于 NamedRunnable , NamedRunnable实现了Runnable 接口
- 重点 client.dispatcher().enqueue(new AsyncCall(responseCallback));
- dispatcher.enqueue() 当异步执行队列的请求数未达到最大值且请求的 host 也未达到最大值时,将请求加入异步执行队列并开始执行。否则加入异步就绪队列
AsynCall分析
AsyncCall继承于NamedRunnable,NamedRunnable实现了Runnable接口
NamedRunnable.java部分代码
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
- 这里除了设置线程的信息之外就是调用抽象方法,所以我们将重心移到 execute() 抽象方法的实现
- 所以在 dispatcher.enqueue() 中的 executorService().execute(call); 就是执行该方法
AsynCall.java部分代码
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
- getResponseWithInterceptorChain(); 拦截器链下面重点分析,这个方法在 同步请求 call.execute() 也出现了,这就是请求开始关键。
- responseCallback 我们定义的回调
- retryAndFollowUpInterceptor 重定向和重试拦截器如果取消的话就回调responseCallback.onFailure()方法,如果没有被取消则回调responseCallback.onResponse() 方法。这里体现了 onFailure、onResponse方法都是在子线程当中,如果我们需要再回调中对UI进行操作,需要切换线程
- catch 对网络请求失败的操作,存在 responseCallback.onFailure 的回调
- finally 最后执行 client.dispatcher().finished(this); 将请求从异步执行队列移除,满足条件则将异步就绪队列的请求加入到异步执行队列
到目前为止,同步、异步请求的基本流程和Dispatcher都有所了解了。接下来我们分析与拦截器。
来源 OkHttp_Wiki
Interceptors are a powerful mechanism that can monitor, rewrite, and retry calls.
拦截器是 OkHttp 提供的对 Http 请求和响应进行统一处理的强大机制,它可以实现网络监听、请求以及响应重写、请求失败重试等功能。
OkHttp uses lists to track interceptors, and interceptors are called in order.
OkHttp 使用列表来跟踪拦截器,并按顺序调用拦截器。
Interceptors are registered as either application or network interceptors.
拦截器可以注册为应用拦截器或网络拦截器;
应用拦截器与网络拦截器的区别
应用拦截器:
Don't need to worry about intermediate responses like redirects and retries.
应用拦截器不能操作中间的响应结果,比如重定向和重试
Are always invoked once, even if the HTTP response is served from the cache.
应用拦截器始终调用一次,即使Http响应是从缓存中提供的。
Observe the application's original intent. Unconcerned with OkHttp-injected headers like If-None-Match.
关注原始的intent,不关心注入的headers,比如If-None-Match。
Permitted to short-circuit and not call Chain.proceed().
应用拦截器允许短路,并且不调用 Chain.proceed(),即可以决定要调用的拦截器
Permitted to retry and make multiple calls to Chain.proceed().
应用拦截器允许请求失败重试,并多次调用其他拦截器
网络拦截器:
Able to operate on intermediate responses like redirects and retries.
网络拦截器可以操作,如重定向和重试等中间操作的结果
Not invoked for cached responses that short-circuit the network.
网络拦截器不允许调用缓存来短路执行中的请求
Observe the data just as it will be transmitted over the network.
网络拦截器可以观察网络传输中的数据
Access to the Connection that carries the request.
网络拦截器可以获取 Connection 中携带的请求信息
以上来自OkHttp官方文档
注意力移到 RealCall 的getResponseWithInterceptorChain() 方法。
RealCall.java部分代码
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
- interceptors.addAll(client.interceptors()); 添加了用户自定义的应用拦截器
- RetryAndFollowUpInterceptor 重试和失败重定向拦截器
- BridgeInterceptor补充用户在请求当中缺少的Http请求头和把返回的响应转化为用户可用的 Response
- CacheInterceptor 处理缓存
- ConnectInterceptor 建立与服务器可用的连接
- interceptors.addAll(client.networkInterceptors()); 添加用户自定义的网络拦截器
- CallServerInterceptor 将Http请求写入网络的IO流当中,并从网络IO流中读取服务端返回的数据
- 最后创建 RealInterceptorChain 对象,并将拦截器list传入
- RealInterceptorChain 调用 proceed 方法执行 originalRequest 请求并返回 Response
RealInterceptorChain.java部分代码
@Override
public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
}
- 代码通过 index 来记录执行到拦截器链的位置, interceptors.get(index); 得到当前拦截器对象
- 再次实例化 RealInterceptorChain 并且传入index+1表示从下一个拦截器开始进行访问,这样就把我们的拦截器list构成了一个有序的链条 (拦截器链)
- interceptor.intercept(next); 返回 Response 对象
- 可以看出 proceed() 方法的核心就在于创建下一个拦截器链 RealInterceptorChain 对象,依次调用Interceptor的 intercept() 方法,并返回给上一个拦截器Response对象,可以看成是一个递归的过程,如下图所示。
重点就落在了每个拦截器的 intercept() 方法上,接下来我们分析系统给我们5个拦截器,忽略用户自定义的拦截器
1. RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor.java部分代码
@Override
public Response intercept(Chain chain) throws IOException {
...
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
...
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
...
Request followUp;
try {
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
...
request = followUp;
priorResponse = response;
}
}
- 创建StreamAllocation对象,用来建立执行 Http 请求的 Socket 连接对象(之后再深入分析)
- 调用 RealInterceptorChain.proceed() 进行网络请求
- 根据异常结果或响应结果判断是否要进行重新请求
- 当后续的拦截器发生RouteException或者IOException时,判断连接是否可恢复,如果不可重试会再将异常抛出,并释放streamAllocation资源;如果可以重试,则继续调用 RealInterceptorChain.proceed() 进行网络请求。
- 通过 if (++followUpCount > MAX_FOLLOW_UPS) 来防止陷入死循环
- 对response进行处理,通过 followUpRequest() 方法判断是否需要重定向。如果需要重定向的话,从头部的 Location 中获取新的 url 生成新的 request 进行新的请求。无需重定向等操作,则将 response 返回上一级拦截器
2. BridgeInterceptor
BridgeInterceptor.java部分代码
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
//主要是对请求头添加字段,省略
...
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
- 将传入的 Request 的请求头添加字段,转化为能够进行网络访问的请求
- 将这个符合网络请求的 Request 使用 chain.proceed(requestBuilder.build()); 方法传入下一个拦截器
- 将网络请求回来的响应 Response 转化为用户可用的 Response (gzip解压操作等)
3. CacheInterceptor
在看CacheInterceptor之前我们要先看一下Cache.java
示例:
OkHttpClient okHttpClient = new OkHttpClient().newBuilder()
.cache(new Cache(new File("cache"), 8 * 1024 * 1024))
.build();
Cache.put()方法
Cache.java部分代码
@Nullable
CacheRequest put(Response response) {
String requestMethod = response.request().method();
if (HttpMethod.invalidatesCache(response.request().method())) {
try {
remove(response.request());
} catch (IOException ignored) {
}
return null;
}
if (!requestMethod.equals("GET")) {
return null;
}
...
Entry entry = new Entry(response);
DiskLruCache.Editor editor = null;
try {
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
entry.writeTo(editor);
return new CacheRequestImpl(editor);
} catch (IOException e) {
abortQuietly(editor);
return null;
}
Entry(Response response) {
this.url = response.request().url().toString();
this.varyHeaders = HttpHeaders.varyHeaders(response);
this.requestMethod = response.request().method();
this.protocol = response.protocol();
this.code = response.code();
this.message = response.message();
this.responseHeaders = response.headers();
this.handshake = response.handshake();
this.sentRequestMillis = response.sentRequestAtMillis();
this.receivedResponseMillis = response.receivedResponseAtMillis();
}
}
HttpMethod.java部分代码
public static boolean invalidatesCache(String method) {
return method.equals("POST")
|| method.equals("PATCH")
|| method.equals("PUT")
|| method.equals("DELETE")
|| method.equals("MOVE"); // WebDAV
}
- 通过response.request().method()方法获取requestMethod如果是非"GET"方法,则对缓存进行删除操作,并且返回null,不进行缓存
- Entry实例就是要写入缓存部分,当中保存了url,headers,code,message等信息
- DiskLruCache是OkHttp缓存的核心,key通过 url md5加密后得到的16进制形式,value是包装好的Entry
- 到目前为止我们还尚未看到响应的body缓存在何处?
- return new CacheRequestImpl(editor); 在返回时,实例化了 CacheRequestImpl 对象,所以我们需要看看这个类做了什么。
Cache.java部分代码
CacheRequestImpl(final DiskLruCache.Editor editor) {
this.editor = editor;
this.cacheOut = editor.newSink(ENTRY_BODY);
this.body = new ForwardingSink(cacheOut) {
@Override public void close() throws IOException {
synchronized (Cache.this) {
if (done) {
return;
}
done = true;
writeSuccessCount++;
}
super.close();
editor.commit();
}
};
}
- 从上面的代码中我们看到了重点的 body 而且使用了书写 Entry 对象的DiskLruCache.Editor 用来书写 body
Cache.get()方法
Cache.java部分代码
@Nullable
Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
return null;
}
try {
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
Response response = entry.response(snapshot);
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}
return response;
}
public Response response(DiskLruCache.Snapshot snapshot) {
String contentType = responseHeaders.get("Content-Type");
String contentLength = responseHeaders.get("Content-Length");
Request cacheRequest = new Request.Builder()
.url(url)
.method(requestMethod, null)
.headers(varyHeaders)
.build();
return new Response.Builder()
.request(cacheRequest)
.protocol(protocol)
.code(code)
.message(message)
.headers(responseHeaders)
.body(new CacheResponseBody(snapshot, contentType, contentLength))
.handshake(handshake)
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(receivedResponseMillis)
.build();
}
CacheResponseBody(final DiskLruCache.Snapshot snapshot,
String contentType, String contentLength) {
this.snapshot = snapshot;
this.contentType = contentType;
this.contentLength = contentLength;
Source source = snapshot.getSource(ENTRY_BODY);
bodySource = Okio.buffer(new ForwardingSource(source) {
@Override public void close() throws IOException {
snapshot.close();
super.close();
}
});
}
- key 通过 url md5加密后的16进制形式得到
- DiskLruCache.Snapshot 用来记录了缓存在某个特定时刻所包含的内容 (缓存的快照)
- cache.get(key) 通过 key 得到快照,如果为空,说明没有缓存则返回null
- 得到 Snapshot 之后,就可以根据它来创建出 Entry 对象
- entry.response(snapshot); 从中得到 response 对象
- 用 request 和 response 进行成对匹配操作,匹配则返回 response 对象
4. ConnectInterceptor
ConnectInterceptor.java部分代码
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
- ConnectInterceptor 打开与服务器的连接,开始 OkHttp 的网络请求
- 核心 streamAllocation,我们在 RetryAndFollowUpInterceptor 中进行了初始化,但是并没有使用。
- 通过 streamAllocation 的 newStream() 方法创建了 HttpCodec 对象
- HttpCodec 用来编码 Request 和解码 Response (OkHttp封装好的对象)
- RealConnection 顾名思义是用来实际的网络 IO 连接传输的
- 以上,将创建好的用于网络 IO 的 RealConnection 对象,以及对于与服务器交互最为关键的 HttpCodec 等对象传递给后面的拦截器
所以我们要将目光放在最为重要的 streamAllocation 对象
StreamAllocation.java部分代码
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
- 通过 findHealthyConnection 方法获取 RealConnection 对象
- 在 findHealthyConnection 方法中用了 while 循环调用 findConnection 方法
- 在同步代码块中,candidate.successCount 为 0 的时候,意味着整个网络连接结束了(即一个新的连接,可以跳过健康检查),可以跳出循环
- candidate.successCount 不为 0 的时候, 则判断连接是否健康,如果为不健康的状态,调用 noNewStreams() 方法销毁资源,并继续循环
- 得到 RealConnection 对象后就能创建 HttpCodec 对象
那什么是不健康的 RealConnection?
1.Socket连接未关闭;
2.Socket输入输出流未关闭;
3.Http2Connection未关闭等等
为了理解 RealConnection 对象的创建过程,进一步分析 findConnection 方法
StreamAllocation.java部分代码
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
...
synchronized (connectionPool) {
...
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
...
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.acquire(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
...
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
...
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
...
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
Internal.instance.put(connectionPool, result);
...
}
...
return result;
}
- releasedConnection = this.connection; 从注释中可以看出,目的是为了尝试复用当前连接
- if (result == null) 表示如果不能复用当前连接,从连接池(下面进行分析)中获取一个RealConnection
- if (connection != null) 连接池中是否有此连接,有则结束 findConnection() 方法,否则开始新建连接
- 新建连接时,调用result.connect() 方法进行实际的网络连接,连接成功后将新的RealConnection放入连接池中
- result.connect() 里进行了 Http 协议连接方式的选择: 隧道连接 和 Socket连接(能力有限暂未深入分析)
ConnectionPool连接池分析
OkHttp将客户端与服务端的连接抽象成了Connection的一个类。而RealConnection就是Connection的实现类。为了在一定时间范围内更好地管理Connection类以及连接复用,OkHttp提供了ConnectionPool连接池。
ConnectionPool.acquire/get方法分析
ConnectionPool.java部分代码
void acquire(Address address, StreamAllocation streamAllocation, @Nullable Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return;
}
}
}
StreamAllocation.java部分代码
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
this.reportedAcquired = reportedAcquired;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
RealConnection.java部分代码
/** Current streams carried by this connection. */
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
- ConnectionPool.acquire() 方法中,使用 for 循环遍历找到可用的连接 (address, route)
- 将可用的 connection 传入 StreamAllocation.acquire() 方法
- StreamAllocation中,先将ConnectionPool 的 connection 赋值给 StreamAllocation的成员变量
- connection.allocations.add(new StreamAllocationReference(this, callStackTrace)); 将 StreamAllocation 的弱引用加入到 RealConnection.allocations中去
- RealConnection.allocations 用于统计和保持当前连接对象所持有的 StreamAllocation 数目,后续通过集合的大小来判断一个网络连接的负载量是否已经超过了最大值 (往下看)
ConnectionPool.put方法 分析
ConnectionPool.java部分代码
private final Deque<RealConnection> connections = new ArrayDeque<>();
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
- ConnectionPool.put()方法中,在做add连接队列前,先开启了一个异步的清理任务
- cleanupRunnable 回收连接
Connection回收
ConnectionPool.java部分代码
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
- while 死循环
- waitNanos 首次清理回收的时候,返回下一次开始清理回收的时间
- ConnectionPool.this.wait(waitMillis, (int) waitNanos); 等待下一次清理
cleanup() 方法类似于 GC 标记清除算法不做深究,我们重点看一下如何找到最不活跃的连接
ConnectionPool.java部分代码(cleanup调用)
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
....
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
- 主要就是通过判断 connection.allocations 集合中的 StreamAllocation 弱引用是否为 null
- StreamAllocation 弱引用为 null 时,说明已经没有代码引用了就应该调用 references.remove() 移除
- if (references.isEmpty()) 当集合为空的时候,表示目前这个连接也没有代码引用,就可以将连接进行回收了
5. CallServerInterceptor
CallServerInterceptor.java部分代码
public final class CallServerInterceptor implements Interceptor {
...
@Override public Response intercept(Chain chain) throws IOException {
final RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
final HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
...
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
...
}
if (responseBuilder == null) {
if (request.body() instanceof DuplexRequestBody) {
...
request.body().writeTo(bufferedRequestBody);
} else {
...
}
} else if (!connection.isMultiplexed()) {
...
}
}
...
httpCodec.finishRequest();
...
responseBuilder = httpCodec.readResponseHeaders(false);
...
if (forWebSocket && code == 101) {
...
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
...
return response;
}
}
- RealInterceptorChain 拦截器链,所有的网络请求由这个连接在一起来完成相应的网络请求
- HttpCodec 用来编码 Request 和解码 Response
- StreamAllocation 用来建立Http请求需要的网络设施的组件,分配 Stream
- RealConnection OkHttp 将客户端与服务端的连接抽象成了 Connection, RealConnection 是 Connection 的具体实现
- Request request = realChain.request(); 获取请求体
- httpCodec.writeRequestHeaders(request); 向 Socket 当中写入请求头信息
- if ("100-continue".equalsIgnoreCase(request.header("Expect"))) 特殊情况,当Expect为100-continue时,只发送请求头,不写入请求体
- request.body().writeTo(bufferedRequestBody); 非特殊情况下,向Socket 写入请求体
- httpCodec.finishRequest(); 完成了Http请求,接下来我们要处理响应
- responseBuilder = httpCodec.readResponseHeaders(false); 读取响应的头部信息
-
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build(); 通过构建者模式创建响应体 - streamAllocation.noNewStreams(); 当我们建立好连接的时候,禁止新的流创建
学习渠道
- OkHttp源码
- 慕课
- 网络博客