本次学习Okhttp源码是对以下这个版本,从开始请求到请求结束回调请求结果的一这个主要流程
implementation 'com.squareup.okhttp3:okhttp:3.10.0'
所涉及的核心类为
- OkhttpClient
- Request
- Response
- Call
- Callback
Okhttp简单使用如下:
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url("http://baidu.com")
.get()
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.e("test", "fail ---");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.e("test", "success-->" + response.body());
}
});
发送请求的主线流程
call.enqueue(Callback responseCallback),call是一个接口它的实现类是RealCall
final class RealCall implements Call {
...
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
...
}
以上这段代码表示不能同时执行相同的请求,否则会抛出"Already Executed"的异常,然后执行到client.dispatcher().enqueue(new AsyncCall(responseCallback)),跟进Dispatcher中
public final class Dispatcher {
//等待队列的容量
private int maxRequests = 64;
//运行队列的容量
private int maxRequestsPerHost = 5;
//请求的缓存队列
private @Nullable ExecutorService executorService;
//等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//异步请求的运行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//同步请求的运行队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//初始化缓存队列
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
//异步请求
synchronized void enqueue(AsyncCall call) {
//如果对同一个host地址请求的的运行队列此时长度小于5,则把请求添加进运行队列,立即通过线程池执行,如果不满足条线就先添加到等待队列里
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
}
Dispatcher类中定义了等待队列和同步请求和异步请求的队列,以及等待列表容量64,运行队列长度5,也就是说如果添加一个请求此时运行队列里面不足5个就直接把这个请求放进运行队列直接执行,如果此时运行队列里面有五个则先放在等待队列中,等运行队列里面执行完少于5个的时候再从等待队列里面取去放入运行队列执行请求
其中线程池创建过程如下
executorService = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Util.threadFactory("OkHttp Dispatcher", false));
public class Util{
public static ThreadFactory threadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, name);
result.setDaemon(daemon);
return result;
}
};
}
创建了一个核心线程为0,无限容量的子线程,存活时间60s的线程池,并且为线程指定了名字,thread.setDaemon(daemon)设置线程为守护线程,SynchronousQueue是一个没有容量的队列,这里配合核心线程为0,线程容量Int.Max实现了一个接收到请求就直接分配线程(复用或者新创建线程)执行的过程。
再分析线程池执行executorService().execute(call),其中的call是new AsyncCall(responseCallback)创建的,AsyncCalls继承自NamedRunnable,是RealCall的内部类。NamedRunnable实现了Runnable接口
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
AsyncCall继承NamedRunnable,并且实现了抽象方法execute
final class RealCall implements Call {
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
//实现父类定义的抽象类,当父类被线程池执行的时候,这个类也会执行
@Override protected void execute() {
boolean signalledCallback = false;
try {
//责任链模式实现的各类拦截器
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
}
通过以上过程可以知道,当Dispatcher.enqueue()执行线程池executorService().execute(call)会触发RealCall的内部类AsyncCall执行复写父类抽象的execute方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
通过策略模式逐层让拦截器处理response,如果请求失败则回调responseCallback.onFailure(RealCall.this, new IOException("Canceled")),如果请求成功则回调responseCallback.onResponse(RealCall.this, response)。
如上,Http请求的主线流程就完成了。
Okhttp中用到的设计模式
构建者模式
OkHttpClient和Request的创建是构造者模式创建的,构造者模式比较常见不多解释。
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.build();
Request request = new Request.Builder()
.url("http://baidu.com")
.get()
.build();
责任链模式
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
责任链模式:为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。
在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。
举个栗子:
员工李华需要请假,向所在部门主管(Manager1)申请,部门主管(Manager1)同意转给上一级主管(Manager2)审批,Manager2同意审批再转给人事主管(Manager3)审批
step 1:创建执行事务的接口IBaseTask.java
interface IBaseTask {
/**
* @param task 事务内容
* @param iBaseTask 下一个任务节点
*/
public void doAction(String content, IBaseTask iBaseTask);
}
事特批:创建三个责任链成员,也就是三个经理Manager
class Manager1 implements IBaseTask {
@Override
public void doAction(String content, IBaseTask iBaseTask) {
if (content.equals("请假")) {
System.out.println("Manager1 同意请假 并转交给下一个Manager审批...");
iBaseTask.doAction(content, iBaseTask);
} else {
}
}
}
class Manager2 implements IBaseTask{
@Override
public void doAction(String content, IBaseTask iBaseTask) {
if (content.equals("请假")) {
System.out.println("Manager2 同意请假 并转交给下一个Manager审批...");
iBaseTask.doAction(content, iBaseTask);
}
}
}
class Manager3 implements IBaseTask {
@Override
public void doAction(String content, IBaseTask iBaseTask) {
if (content.equals("请假")) {
System.out.println("Manager3 同意请假 流程结束");
return;
}
}
}
step3:通过TaskManager实现责任链实现
class TaskManager implements IBaseTask {
private List<IBaseTask> taskList;
private int index = 0;
public TaskManager() {
this.taskList = new ArrayList<>();
}
public void addTask(IBaseTask task) {
if (taskList == null || taskList.isEmpty()) {
this.taskList = new ArrayList<>();
}
taskList.add(task);
}
@Override
public void doAction(String content, IBaseTask iBaseTask) {
if (taskList.isEmpty()) {
System.out.println("taskList is empty!");
return;
}
if (index < 0 || index >= taskList.size()) {
System.out.println("index out of bundle!");
return;
}
IBaseTask task = taskList.get(index);
index++;
task.doAction(content, iBaseTask);
}
}
测试结果
public static void main(String[] args) {
TaskManager taskManager = new TaskManager();
taskManager.addTask(new Manager1());
taskManager.addTask(new Manager2());
taskManager.addTask(new Manager3());
taskManager.doAction("请假", taskManager);
}
执行结果如下
Manager1 同意请假 并转交给下一个Manager审批...
Manager2 同意请假 并转交给下一个Manager审批...
Manager3 同意请假 流程结束
Class transformation time: 0.008817664s for 99 classes or 8.817664E-5s per class
Okhttp的拦截器也是类似如上的这种责任链模式实现。