Okhttp同步流程源码分析

Okhttp3 同步基本流程

流程分析

OkhttpClient

使用Okhttp,首先需要实例化一个OkhttpClient对象。支持2种构造方式

  1. 默认方式
val client = OkhttpClient()

2.使用建造者方式

val client = OkhttpClient.Builder()
                .build()

他们的具体实现为

public OkhttpClient() {
    this(new Builder());
}

可以看到这种方式,不需要配置任何参数,也就是说基本参数都是默认的,调用的是下面的构造函数。

OkHttpClient(Builder builder) {
    。。。。
}

建造者返回一个OkhttpClient实例

public OkHttpClient build() { 
    return new OkHttpClient(this);
 }

我们查看源码发现默认方式其实是使用了2种设计模式:
1.外观模式
2.建造者模式

Request

OkhttpClient完成后就需要构建一个Request对象,我们查看Requests对象找不到过多的构造函数,唯一的构造函数是这样的

Request(Builder builder) {  
    this.url = builder.url;
    this.method = builder.method; 
    this.headers = builder.headers.build();  
    this.body = builder.body;  
    this.tags = Util.immutableMap(builder.tags);
    }

这就是在告诉我们,创建Requests实例需要使用Builder模式进行构建


public Builder newBuilder() {  
    return new Builder(this);
    }


public static class Builder {  
    @Nullable HttpUrl url;  
    String method;  
    Headers.Builder headers;  
    @Nullable RequestBody body;    
    
    Map<Class<?>, Object> tags = Collections.emptyMap();  
    
    public Builder() {    
    this.method = "GET";    
    this.headers = new Headers.Builder(); 
    }  
    Builder(Request request) {    
    this.url = request.url;    
    this.method = request.method;    
    this.body = request.body;    
    this.tags = request.tags.isEmpty() ? Collections.<Class<?>, Object>emptyMap() : new LinkedHashMap<>(request.tags);    
        this.headers = request.headers.newBuilder(); 
   。。。
   }

查看其他的对象发现也是基本使用Builder模式进行构建,所以此处就不在继续查看

同步请求

我们需要构建一个call,一般使用 val call = okhttpClient.newCall(request)构建,去查看OkHttpClient源码

@Override public Call newCall(Request request) {  return RealCall.newRealCall(this, request, 
false /* for web socket */);
}

@Override 看到这个注解我们就要意识到,这个方法不是继承就是实现接口
我们继续往源码翻,发现他是实现了Call.Factory接口

interface Factory {  
    Call newCall(Request request);
}

这里又使用了新的设计模式,工厂设计模式,具体的实例由其子类产生。
将构建的细节交给具体实现,顶层只需要拿到Call对象即可。
我们继续查看RealCall中的newRealCall方法

static RealCall newRealCall(OkHttpClient client, Request originalRequest,   boolean forWebSocket) { 
    // Safely publish the Call instance to the EventListener.  
    RealCall call = new RealCall(client, originalRequest, forWebSocket);  
    call.eventListener = client.eventListenerFactory().create(call);  
    return call;
    }

这里是一个静态方法, 创建了一个EventListener对象,名字可以看出,这是一个监听事件的对象从构造方法看出,这里使用了工厂设计创建的eventListener
继续查看RealCall的构造函数

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {  
    this.client = client;  
    this.originalRequest = originalRequest;  
    this.forWebSocket = forWebSocket;  
    //添加了RetryAndFollowUpInterceptor拦截器
    this.retryAndFollowUpInterceptor = new  RetryAndFollowUpInterceptor(client, forWebSocket);  
    this.timeout = new AsyncTimeout() {    
    @Override protected void timedOut() {
            cancel();   
          }  
    };  
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}

核心来了,执行请求
我们首先看的是同步请求

Response execute() throws IOException;

/*** Schedules the request to be executed at some 
point in the future. * * <p>The {@link OkHttpClient#dispatcher 
dispatcher} defines when the request will run: 
usually * immediately unless there are several other 
requests currently being executed. * * <p>This client will later call back {@code responseCallback} with either an HTTP response 
or a * failure exception. * * @throws IllegalStateException when the call 
has already been executed. */
//异步请求方法
void enqueue(Callback responseCallback);

我们查看具体的实现,在RealCall中实现了这个方法

@Override public Response execute() throws IOException {  
    synchronized (this) {   
    if (executed) throw new IllegalStateException("Already Executed");              executed = true;  
    }  
    captureCallStackTrace();  
    timeout.enter();  
    eventListener.callStart(this);
    try {    
        client.dispatcher().executed(this);    
        Response result = getResponseWithInterceptorChain();    
        if (result == null) throw new IOException("Canceled");    
        return result; 
       } catch (IOException e) {   
       e = timeoutExit(e);    
       eventListener.callFailed(this, e);    
       throw e; 
       } finally {   
       client.dispatcher().finished(this); 
   }
}

synchronized加入了对象锁,防止多线程同时调用,这里先判断一下executed是否为true判断当前call是否被执行了,如果为ture,则抛出异常,没有则设置为true
captureCallStackTrace()

private void captureCallStackTrace() {  
    Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");  
    retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}

源码来看是为了前文添加的拦截器添加一系列的堆栈信息
timeout.enter();
这里应该是进行超时之类的校验,不深究继续往下看
eventListener.callStart(this);
可以看到前面构建的eventListener起到作用了,这里先回调callStart方法。
client.dispatcher().executed(this);
这里出现一个新的东东: dispatcher,所以我们回到OkhttpClient代码中去查看这个到底是什么东西

public Dispatcher dispatcher() {  return dispatcher;}

看到是返回了一个Dispathcer对象,没有其他任何东西,我们继续点进去看看Dispatcher是什么
先对Dispatcher的成员变量有个认识

/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently. */

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  private @Nullable ExecutorService executorService;

  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
 }

从注释中我们可以知道这是一个关于何时执行异步请求的策略
可以看到,最大请求数64,同一个host请求5,这里用三个队列ArrayDeque用于保存Call对象,分为三种状态异步等待,同步running,异步running。
前方使用的client.dispatcher().executed(this);
executed在Dispatcher中为

synchronized void executed(RealCall call) {  runningSyncCalls.add(call);}

将当前call塞入同步running队列中
Response result = getResponseWithInterceptorChain();
getResponseWithInterceptorChain() 这个方法就是OkHtpp中的核心
我们点开这个方法

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //自定义的拦截器
    interceptors.addAll(client.interceptors());
    //失败和重定向拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    //封装request和response拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存相关的拦截器,负责读取缓存直接返回、更新缓存
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //负责和服务器建立连接,连接池等
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //配置 OkHttpClient 时设置的 networkInterceptors
      interceptors.addAll(client.networkInterceptors());
    }
    //负责向服务器发送请求数据、从服务器读取响应数据(实际网络请求)
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

注意Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());中的0
我们看到这个方法是添加拦截器,然后调用chian中的proceed方法,我们继续查看proceed方法

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

初看很乱但核心在这几行代码

RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

这样就很清晰了,这里index就是我们刚才的0,也就是从0开始,如果index超过了过滤器的个数抛出异常,后面会再new一个RealInterceptorChain,而且会将参数传递,并且index+1了,接着获取index的interceptor,并调用intercept方法,传入新new的next对象,这里用了递归的思想来完成遍历。这里也就是okhttp最经典的责任链模式。在这里找不到退出的地方,我们往前getResponseWithInterceptorChain()方法interceptors.add(new CallServerInterceptor(forWebSocket));这是最后加载的一个拦截器,我们进去查看

@Override public Response intercept(Chain chain) throws IOException {
    ...
    return response;
  }

篇幅太长,我们省略一下 看见这个拦截器并没有继续调用链中intercept方法,而是直接返回的response对象
同步的我们就分析到这里

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。