如果我们想请求数据,使用少量的代码就可以实现:
OkHttpClient client = new OkHttpClient();
String url = "https://www.baidu.com/";
Request request = new Request().Builder()
.url(url)
.get()
.build();
Call call = client.newCall(request);
request.enqueue(new CallBack(){
@Override
public void onResponse(Call call,Response response) throws IOException{
}
@Override
public void onFailure(Call call,IOException e){
}
})
OkHttpClient类
创建OkHttpClient类的两种方式:
- 直接创建对象 new OkHttpClient()
- new OkHttpClient.Builder().build()
OkHttpClient对象源码:
public OkHttpClient() {
this(new Builder());
}
OkHttpClient(Builder builder) {
//调度器,用于控制并发的请求。内部保存同步和异步请求的call,并使用线程池处理异步请求。
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;//代理设置
this.protocols = builder.protocols;//默认支持http协议版本
this.connectionSpecs = builder.connectionSpecs;//okhttp连接 connection配置
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;//一个Call的状态监听器
this.proxySelector = builder.proxySelector;//使用默认的代理选择器
this.cookieJar = builder.cookieJar;//默认是没有cookie的
this.cache = builder.cache;//缓存
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;//使用默认的Scoket工厂产生Socket
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;//安全相关的设置
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;//连接池
this.dns = builder.dns;//域名解析系统 domain name->ip address
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;//这个和websocket相关,为了保持长连接,我们必须每间隔一段时间放松一个ping指令
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
Call类
在定义了请求对象后,需要生成一个Call对象。该对象代表一个准备被执行的请求。Call是可以被取消的,Call表示单个请求/响应对流,不能执行两次。
public interface Call extends Cloneable {
Request request();
Response execute() throws IOException;
void enqueue(Callback responseCallback);
void cancel();
boolean isExecuted();
boolean isCanceled();
Timeout timeout();
Call clone();
interface Factory {
Call newCall(Request request);
}
}
- 进入OkHttpClient的newCall方法
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
- newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
newCall方法获得Call实际是RealCall,RealCall就是准备执行的请求,是对接口Call的实现,其内部持有OkHttpClient实例,Request实例。并且这里还创建了Transmitter给RealCall的transmitter赋值。
Transmitter类
Transmitter意为发射器,是应用层和网络层的桥梁。在进行连接、真正发出请求和读取响应中起到很重要的作用。
public Transmitter(OkHttpClient client, Call call) {
this.client = client;
this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
this.call = call;
this.eventListener = client.eventListenerFactory().create(call);
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
Transmitter内部持有OkHttpClient、连接池、call、事件监听器。
Dispatcher类
Dispatcher类负责异步任务的请求策略。
public final class Dispatcher {
private int maxRequests = 64;
//每个主机的最大请求数,如果超过这个数,新的请求会被加到readyAsyncCalls队列中
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
//任务队列线程池
private @Nullable ExecutorService executorService;
//待执行异步任务队例
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//运行中的异步任务队例
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//运行中同步任务队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
同步请求执行流程
client.newCall(request).execute(),execute方法在Call的实现类RealCall中。
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();//超时计时开始
transmitter.callStart();//回调监听器的请求开始
try {
client.dispatcher().executed(this);//放入队列
return getResponseWithInterceptorChain();//执行请求获取结果
} finally {
client.dispatcher().finished(this);//请求结束
}
}
首先判断 如果已经执行,就会抛出异常。这就是一个请求只能执行一次的原因。然后回调请求监听器的请求开始。然后调用client的调度器Dispatcher的executed方法。
- dispatcher().executed()
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
请求放入一个双端队列runningSyncCalls中,表示正在执行的同步请求。
然后返回了getResponseWithInterceptorChain()的结果Response,同步请求真正的请求流程是在getResponseWithInterceptorChain方法中(详情见下节)。
最后请求结束,会走Dispatcher的finished(Deque calls, T call)方法。
- dispatcher.finished()
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
这里将call从同步队列中移除,并且调用了promoteAndExecute()方法,这个方法在后面讲述。
异步请求执行流程
- 异步方法equeue()
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
//设置exexuted参数为true,表示不可以执行两次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();//回调请求监听器的请求开始
//传入一个新的对象AsyncCall
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
- AsyncCall类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行耗时的IO操作
//获取拦截器链,详见下篇文章
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//回调,注意这里回调是在线程池中,而不是向当前的主线程回调
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回调,同上
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
//回调,同上
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
AsyncCall继承NamedRunnable,NamedRunnable实现自Runnable,即AsyncCall就是个Runnable,它是会在线程或线程池中执行run方法的。
- NamedRunnable类
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//execute抽象方法,在AsyncCall中有具体实现
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
在分析同步的时候知道Dispatcher调度器负责异步请求策略,去看看equeue方法。
- Dispatcher.equeue()
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
//相同host请求,共用一个调用技术
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
//从runningAsyncCalls和readyAsyncCalls找到相同的host请求
private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
- promoteAndExecute()
//调度的核心方法:在控制异步并发的策略基础上,使用线程池 执行异步请求
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; //最大并发数64.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host最大并发数.
i.remove();//从等待队列中移除
//host并发数+1
asyncCall.callsPerHost().incrementAndGet();
//加入可执行请求的集合
executableCalls.add(asyncCall);
//加入正在执行的异步请求队列
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//可执行的请求
asyncCall.executeOn(executorService());
}
return isRunning;
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
遍历readyAsyncCalls,先进行两个检查:
- 正在执行异步请求runningAsyncCalls数量大于最大并发请求数64就break;
- 相同host请求的数量大于5,就continue。
如果检查都通过,就从等待队列中移除,callPerHost自增1,放入可执行的集合executableCalls,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。
这里的异步请求等待队列,是为了控制最大并发数的缓冲,异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。
- AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//在线程池中执行asyncCall
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);//回调失败
} finally {
if (!success) {
client.dispatcher().finished(this); //执行发生异常 结束
}
}
}
AsyncCall的run方法会走到execute()方法,在上面有展示。
下面总结一下请求的流程:
- 同步请求
- 调用client.newCall(request).execute()方法,也就是RealCall的execute方法;
- execute方法内部调用client.dispatcher().executed()方法,将当前RealCall加入到runningSyncCalls队列;
- 使用getResponseWithInterceptorChain()获取结果;
- 最后调用Dispatcher的finish方法结束请求。
- 异步请求
- 调用client.newCall(request).equeue()方法,其内部调用client.dispatcher().enqueue(new AsyncCall(responseCallback))方法;
- 先将AsyncCall加入到当前readyAsyncCalls队列中,在找到执行当前主机的AsyncCall,一个主机用同一个AsyncCall;
- 使用promoteAndExecute()方法在控制异步并发的策略基础上使用线程池执行异步请求(并发控制有包括最大并发数64,host最大并发数5)。异步请求的执行也是使用getResponseWithInterceptorChain(),获得结果后回调出去。最后调用Dispatcher的finish方法结束请求。