1. 项目结构
- 拦截器
- 连接池
- 线程池
okhttp3/
└── internal
├── cache
├── cache2
├── connection
├── http
├── http1
├── http2
├── io
├── platform
├── publicsuffix
├── tls
└── ws
2.执行流程图
2.1代码执行流程
2.2 正常流程
- 通道建立与安全
- dns
- 是否安全连接(https, http)
- 连接建立
- 数据传输与解析
- 发送请求
- 接收响应
- 释放连接
完整的请求结束后,连接通道不会被立即关闭,而是会存活一段时间,当相同的请求在短时间内再次被触发时, 连接会被复用,从而跳过连接过程
2.3 异常处理流程
- 连接为建立时,直接触发异常
- 连接建立后, 关闭连接,触发异常
2.4重试机制
-
连接复用机制
3.请求(Request)
3.1 参数列表
参数 | 必选 | 作用 |
---|---|---|
url | yes | 请求地址 |
method | no | 请求方法,默认GET |
header | no | 请求的头部信息,拥有默认值 |
body | no | 请求体, 可以为空 |
3.2 方法
header(String name, String value)
使用该方法,会导致如果已经有key相同的键值对,该键值对会被新值替换
addHeader(String name, String value)
添加头部键值对,不会覆盖
cacheControl(CacheControl cacheControl)
响应的缓存控制,本质也是设置头部信息
tag(Object tag)
设置该请求的标签,通过该标签可以使用okHttpClient取消本次请求
//工具方法
public class OkHttpUtils {
public static void cancelCallWithTag(OkHttpClient client, String tag) {
// A call may transition from queue -> running. Remove queued Calls first.
for(Call call : client.dispatcher().queuedCalls()) {
if(call.request().tag().equals(tag))
call.cancel();
}
for(Call call : client.dispatcher().runningCalls()) {
if(call.request().tag().equals(tag))
call.cancel();
}
}
}
4.响应
4.1 参数列表
参数 | 作用 |
---|---|
code | 响应状态码 |
message | 请求结果简单描述 |
header | 响应的头部 |
body | 响应体 |
4.2 方法
protocol()
返回协议版本,枚举类型
code()
响应的状态码
handshake()
请求的握手信息, 如果不是tls 协议,则返回空的
peekBody(long byteCount)
获取部分响应,如果byteCount 大于全部的响应字节数, 则返回全部响应, 反之,则返回对应字节数的响应体
networkResponse()
获取网络响应
cacheResponse()
获取缓存中的响应
priorResponse()
获取上一个请求, 重定、向验证、重连、这些操作
cacheControl()
控制缓存的
challenges()
当验证未通过时,获取验证的详细信息
sentRequestAtMillis()
获取发送请求时的时间戳, 如果是缓存中的响应,那么这个时间戳就是原始请求的发送时间
receivedResponseAtMillis()
获取接收响应的时间(接收头部),如果是缓存的响应,那么这个时间戳就是原始接收响应的时间戳
5.拦截器
5.1应用级别
应用层级感知不到响应是否是缓存,缓存和网络都可以拦截到
public class Test {
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new LoggerInterceptor())
.build();
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
try (Response response = client.newCall(request).execute()) {
} catch (Exception ignore) { }
}
private static class LoggerInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
//获取请求对象
Request request = chain.request();
System.out.println(String.format("request >>> timestamp:%d, method:%s, url:%s, headers:%s, body:%s",
System.currentTimeMillis(), request.method(), request.url(), request.headers(), request.body()));
//同步触发请求
Response response = chain.proceed(request);
System.out.println(String.format("response >> timestamp:%d, code:%d, headers:%s, body:%s",
System.currentTimeMillis(), response.code(), response.headers(), request.body()));
return response;
}
}
}
5.2网络级别
只能拦截到网络层级的,当强制使用缓存或者缓存未过期时,此时使用的是缓存,将拦截不到。
public class TestNetwork {
public static void main(String[] args) {
OkHttpClient client = new OkHttpClient.Builder()
.addNetworkInterceptor(new LoggerInterceptor())
.build();
Request request = new Request.Builder()
//强制使用缓存后,network层级无法捕获
// .cacheControl(CacheControl.FORCE_CACHE)
.url("http://www.baidu.com")
.build();
try (Response response = client.newCall(request).execute()) {
} catch (Exception ignore) { }
}
private static class LoggerInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
//获取请求对象
Request request = chain.request();
System.out.println(String.format("request >>> timestamp:%d, method:%s, url:%s, headers:%s, body:%s",
System.currentTimeMillis(), request.method(), request.url(), request.headers(), request.body()));
//同步触发请求
Response response = chain.proceed(request);
System.out.println(String.format("response >> timestamp:%d, code:%d, headers:%s, body:%s",
System.currentTimeMillis(), response.code(), response.headers(), request.body()));
return response;
}
}
}
6.链式调用
6.1 源码分析
public final class RealInterceptorChain implements Interceptor.Chain {
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
....
//调用proceed时创建链条中的下一个节点,标记节点的方式是使用index脚标
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,connection, index + 1, request, call,eventListener,connectTimeout, readTimeout,writeTimeout);
//获取当前节点
Interceptor interceptor = interceptors.get(index);
//执行该节点任务代码
Response response = interceptor.intercept(next);
....
return response;
}
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//获取原始请求
Request request = chain.request();
//执行任务
...
//调用proceed,获取下一个节点,并执行下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);
...
//获取响应后,执行后续任务
}
}
6.2 模式分析
与传统责任链相比,具有边调用边创建的特性
对于拦截器而言,干自己的事情,获取原始的请求做自己的任务,然后传递出去。对于Chain而言,就是起到组织整个链子的目的,并标记自身位置
//基础接口
public interface Interceptor {
String interceptor(Chain chain);
interface Chain {
String request();
String proceed(String request);
}
}
//拦截器1
public class RetryAndFollowInterceptor implements Interceptor {
@Override
public String interceptor(Chain chain) {
System.out.println("执行 RetryAndFollowInterceptor 拦截器之前代码");
String proceed = chain.proceed(chain.request());
System.out.println("执行 RetryAndFollowInterceptor 拦截器之后代码 得到最终数据:" + proceed);
return proceed;
}
}
//拦截器2
public class BridgeInterceptor implements Interceptor {
@Override
public String interceptor(Chain chain) {
System.out.println("执行 BridgeInterceptor 拦截器之前代码");
String proceed = chain.proceed(chain.request());
System.out.println("执行 BridgeInterceptor 拦截器之后代码 得到最终数据:"+proceed);
return proceed;
}
}
//拦截器3
public class CacheInterceptor implements Interceptor {
@Override
public String interceptor(Chain chain) {
System.out.println("执行 CacheInterceptor 最后一个拦截器 返回最终数据");
return "success";
}
}
//责任链的节点
public class RealInterceptorChain implements Interceptor.Chain {
private List<Interceptor> mInterceptors;
private String mRequest;
private int mIndex;
public RealInterceptorChain(List<Interceptor> interceptors, int index, String request) {
this.mInterceptors = interceptors;
mRequest = request;
this.mIndex = index;
}
@Override
public String request() {
return mRequest;
}
@Override
public String proceed(String request) {
if (mIndex >= mInterceptors.size()) return null;
//使用index变量标记当前责任链执行到何处
RealInterceptorChain next = new RealInterceptorChain(mInterceptors, mIndex + 1, request);
Interceptor interceptor = mInterceptors.get(mIndex);
return interceptor.interceptor(next);
}
}
//程序入口
public class Client {
public static void main(String[] args) {
List<Interceptor> interceptors = new ArrayList<>();
interceptors.add(new BridgeInterceptor());
interceptors.add(new RetryAndFollowInterceptor());
interceptors.add(new CacheInterceptor());
RealInterceptorChain realInterceptorChain = new RealInterceptorChain(interceptors, 0, "request");
String ret = realInterceptorChain.proceed("request");
System.out.println("result: " + ret);
}
}
7.异步与同步请求
连接池
同步
final class RealCall implements Call {
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
//我也不知道干嘛的, 直接添加到队列里面,结束后移除
client.dispatcher().executed(this);
//真正干活的在这里,调用拦截器,堵塞在这里等待请求
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
Response getResponseWithInterceptorChain() throws IOException {
//用户自己设置的责任链
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//负责处理失败后的重试与重定向
interceptors.add(retryAndFollowUpInterceptor);
//将网络请求Request装换成网络请求的数据
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接服务器 负责和服务器建立连接,真正的网络请求(还没发送数据)
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//添加用户配置的网络拦截器
interceptors.addAll(client.networkInterceptors());
}
//写入数据和获取数据并解析成响应
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
}
异步
final class RealCall implements Call {
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//关注一下这个回调,这里回调里面包含要执行的getResponseWithInterceptorChain函数
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
}
public final class Dispatcher {
//当前处于空闲状态时, 处于
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加到运行队列,开始运行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//放入到预备队列
readyAsyncCalls.add(call);
}
}
final class AsyncCall extends NamedRunnable {
@Override protected void execute() {
boolean signalledCallback = false;
try {
//任务执行在其他线程中,稍后介绍一下线程池
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
线程池与连接池
线程池
- 等待队列
- 运行队列
线程池代码分析
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
//线程管理
private @Nullable ExecutorService executorService;
//异步等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步执行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步执行队列,没干啥,就是记录
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//判断实际的运行请求数是否小于允许的最大的请求数量(64) 并且共享主机的正在运行的调用的数量小于同时最大的相同Host的请求数(5)
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//满足,直接执行
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
//存入等待执行的队列
readyAsyncCalls.add(call);
}
}
//当任务完成的时候,会检查等待队列,并将等待队列中的任务添加到执行队列中
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
//异步执行结束
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
//同步执行结束
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
//任务执行完成后的回调
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//移除已经被执行的任务
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
//更新
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
}
连接池
就是通过 host来判断是否可复用连接,socket 不断
public final class ConnectionPool {
//一个定时清理的可执行对象, 清理掉过期的失效的连接
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//存储队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
//比较重要的方法,获取连接,实际上是遍历所有存在的连接判断host是否相同来复用连接,没有直接返回null
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
}