1.okHttp使用流程分析
// 1.创建OkHttpClient对象
OkHttpClient client = new OkHttpClient.Builder().build();
// 2.创建Request对象
Request request = new Request.Builder().build();
// 3.创建请求对象
Call call = client.newCall(request);
// 4.同步请求
try {
// 同步返回结果
Response execute = call.execute();
} catch (IOException e) {
e.printStackTrace();
}
// 4.异步使用 异步返回结果
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
- okhttp请求发起流程分析
1.同步请求
执行call.execute()方法,实际上会执行到RealCall的execute方法,方法所示:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
//此处的代码是将请求加入到okhttp的同步请求队列中
client.dispatcher().executed(this);
//此处代码是真正的发出去同步请求并返回结果,这里涉及到okhttp的拦截器,下面会细讲
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
2.异步请求
执行 call.enqueue()方法实际上会执行到RealCall的enqueue方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 获取okhttp请求调度器调用异步请求,AsyncCall方法是一个实现Runnable的类
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall继承NamedRunnable,NamedRunnable实现了Runnable,rRunnable的run方法会执行到AsyncCall的execute方法
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
// 实际是Runnable的run方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
// 进入拦截器,发起请求获取请求结果
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 请求失败回调
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 请求成功回调
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
2.okHttp使用的设计模式分析
- Builder设计模式
okHttpClient的Builder类,,提供默认配置参数
public Builder() {
dispatcher = new Dispatcher(); // 默认请求调度器
protocols = DEFAULT_PROTOCOLS; // 默认请求协议 http/1.1 和http/2
connectionSpecs = DEFAULT_CONNECTION_SPECS; // 提供默认TLS 连接
eventListenerFactory = EventListener.factory(EventListener.NONE); //指标事件监听器,可以监控HTTP 呼叫的数量、大小和持续时间
proxySelector = ProxySelector.getDefault(); // 默认代理服务器
cookieJar = CookieJar.NO_COOKIES; // 默认cookie实现
socketFactory = SocketFactory.getDefault(); // 默认socket创建工厂
hostnameVerifier = OkHostnameVerifier.INSTANCE; // 默认主机名验证器 可以进行证书校验
certificatePinner = CertificatePinner.DEFAULT; // 加密证书之类的 俺也不太懂
proxyAuthenticator = Authenticator.NONE; // 代理请求认证 空实现
authenticator = Authenticator.NONE; //请求认证 空实现
connectionPool = new ConnectionPool(); // 默认连接池 应该socket连接的连接池
dns = Dns.SYSTEM; // 默认dns解析
followSslRedirects = true; // ssl重定向
followRedirects = true; // 默认开启重定向
retryOnConnectionFailure = true; // 失败重试
connectTimeout = 10_000; // 连接超时
readTimeout = 10_000; // 读取超时
writeTimeout = 10_000; // 写入超时
pingInterval = 0; // 心跳间隔
}
Request类的Builder类,提供默认请求方式
public Builder() {
this.method = "GET"; //默认get请求
this.headers = new Headers.Builder();
}
- 责任链设计模式
前边已经介绍过,这里是okhttp发起请求的地方,从这里开始通过责任链模式走okHttp的所有拦截器
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); // 加载用户自定义拦截器
interceptors.add(retryAndFollowUpInterceptor); // 重试拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar())); // 桥接拦截器
interceptors.add(new CacheInterceptor(client.internalCache())); // 缓存拦截器
interceptors.add(new ConnectInterceptor(client)); // 连接拦截器
if (!forWebSocket) { //如果不是webSocket协议 添加
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket)); // 请求拦截器 最后的一个拦截器
// 构建责任链对象
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 进入责任链
return chain.proceed(originalRequest);
}
}
通过上述代码,最终会进入到RealInterceptorChain的proceed方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
// 构建拦截器责任链的下一个节点
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取当前拦截器
Interceptor interceptor = interceptors.get(index);
// 执行当前拦截器 注意这里的参数是next,也就是说是下一个责任链上的节点
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
此处Response response = interceptor.intercept(next);
会执行拦截器的intercept方法,以第一个拦截器RetryAndFollowUpInterceptor为例,查看此拦截器的intercept方法
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//... 此处省略部分代码
try {
// 执行责任链节点的处理方法 记得上边的拦截器中传入的下一个节点的对象,所以这里就调到下一个节点的proceed方法
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
//... 此处省略部分代码
priorResponse = response;
}
}
这样责任链的每一个节点都会先执行拦截的intercept方法,然后执行下一个节点的proceed的方法,直到执行到最后一个拦截器
CallServerInterceptor的intercept方法,会发起真正的请求,拿到响应体,依次返回结果。(具体请求过程待补充)
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
// ...省略部分代码
Request request = realChain.request();
if (responseBuilder == null) {
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// ...省略部分代码
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
// ...省略部分代码
httpCodec.finishRequest();
// 构建返回体对象 真正返回,这里采用okIo进行网络请求,后期有时间补充对应的请求过程
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
return response;
}
3.okHttp的线程调度器(Dispatcher)分析
从上边的请求过程可以看到,不管同步请求或者异步请求,都会调用okHttpClient的dispatcher()来操作,
//同步请求
client.dispatcher().executed(this);
// 异步请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
Dispatcher调度器的所有成员变量
private int maxRequests = 64; //同时最大请求数量
private int maxRequestsPerHost = 5; // 同时最大同一域名的请求数量
private @Nullable Runnable idleCallback; // 调度器空闲状态的回调
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService; // 请求线程池
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); // 异步请求等待队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); // 异步请求运行队列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); // 同步请求运行队列
同步请求队列分析,从RealCall的同步请求方法开始
@Override public Response execute() throws IOException {
synchronized (this) {
// ...省略部分代码
try {
client.dispatcher().executed(this); // 加载同步请求队列
Response result = getResponseWithInterceptorChain(); //获取请求结果
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this); //完成同步请求
}
}
异步请求队列分析,从RealCall的异步请求enqueue()方法开始分析
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
// ...省略部分代码
client.dispatcher().enqueue(new AsyncCall(responseCallback)); // 加入异步请求队列
}
具体实现,看Dispatcher的enqueue方法
synchronized void enqueue(AsyncCall call) {
// 如果正在运行异步请求数量小于最大请求数量限制
//并且 同一域名的请求数量小于maxRequestsPerHost,将请求加入运行中队列,
// 并直接加入的线程池执行,否则添加到等待队列
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call); //加入异步请求运行中队列
executorService().execute(call); // 执行异步请求
} else {
readyAsyncCalls.add(call); // 加入异步请求等待队列
}
}
异步请求就会在线程中执行Runnable的run方法,也就会执行到AsyncCall 的execute方法,这样就可以正在发起请求获取响应结果。
final class AsyncCall extends NamedRunnable {
// ...省略部分代码
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain(); //获取请求结果
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
// ...省略部分代码
} finally {
client.dispatcher().finished(this); //请求结束
}
}
}
不管是同步请求还是异步请求,最后都会执行到 client.dispatcher().finished(this); 这句代码,我们看一下具体的实现
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//执行结束,将请求从队列中移除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// promoteCalls 同步请求为false,异步请求为true,调用promoteCalls来判断是否可以将等待队列的请求放到运行队列
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount(); //获取请求的数量
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) { //如果请求队列中没有数据,也就是说Dispatcher处于空闲状态,调用idleCallback的run方法
idleCallback.run();
}
}
promoteCalls()方法的实现
private void promoteCalls() {
//运行队列的数量是否大于最大请求数量
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
// 等到队列是否为空
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
// 迭代等待队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 判定同一域名的请求是否消息最大数量
if (runningCallsForHost(call) < maxRequestsPerHost) {
// 移除等待队列数据
i.remove();
//加入运行中队列
runningAsyncCalls.add(call);
// 加入线程池
executorService().execute(call);
}
// 这句代码个人感觉冗余了
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
- Dispatcher线程池的配置
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 第一个参数 核心线程数
// 第二个参数 最大线程数
// 第三个参数 线程保活等到时间
// 第四个参数 单位秒
// 第五个参数 线程阻塞队列
// 第六个参数 线程构造工厂
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
1.线程池的核心线程数为什么设置成0,最小线程数设置成最大值
由于okHttp已经自己利用readyAsyncCalls
和runningAsyncCalls
管理请求队列和等待队列,没必要再通过线程池来控制了
2.采用SynchronousQueue来实现等待队列
SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列,但是 isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。