okhttp源码分析(一)——OkHttp的工作流程分析

如果我们想请求数据,使用少量的代码就可以实现:

OkHttpClient client = new OkHttpClient();
String url = "https://www.baidu.com/";
Request request = new Request().Builder()
                    .url(url)
                    .get()
                    .build();
Call call = client.newCall(request);
request.enqueue(new CallBack(){
    @Override
    public void onResponse(Call call,Response response) throws IOException{
        
    }
    
    @Override
    public void onFailure(Call call,IOException e){
        
    }
})

OkHttpClient类

创建OkHttpClient类的两种方式:

  • 直接创建对象 new OkHttpClient()
  • new OkHttpClient.Builder().build()

OkHttpClient对象源码:

public OkHttpClient() {
this(new Builder());
}

OkHttpClient(Builder builder) {
//调度器,用于控制并发的请求。内部保存同步和异步请求的call,并使用线程池处理异步请求。
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;//代理设置
this.protocols = builder.protocols;//默认支持http协议版本
this.connectionSpecs = builder.connectionSpecs;//okhttp连接 connection配置
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;//一个Call的状态监听器
this.proxySelector = builder.proxySelector;//使用默认的代理选择器
this.cookieJar = builder.cookieJar;//默认是没有cookie的
this.cache = builder.cache;//缓存
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;//使用默认的Scoket工厂产生Socket

boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
  isTLS = isTLS || spec.isTls();
}

if (builder.sslSocketFactory != null || !isTLS) {
  this.sslSocketFactory = builder.sslSocketFactory;
  this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
  X509TrustManager trustManager = Util.platformTrustManager();
  this.sslSocketFactory = newSslSocketFactory(trustManager);
  this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}

if (sslSocketFactory != null) {
  Platform.get().configureSslSocketFactory(sslSocketFactory);
}

this.hostnameVerifier = builder.hostnameVerifier;//安全相关的设置
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
    certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;//连接池
this.dns = builder.dns;//域名解析系统 domain name->ip address
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;//这个和websocket相关,为了保持长连接,我们必须每间隔一段时间放松一个ping指令

if (interceptors.contains(null)) {
  throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
  throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}

Call类

在定义了请求对象后,需要生成一个Call对象。该对象代表一个准备被执行的请求。Call是可以被取消的,Call表示单个请求/响应对流,不能执行两次。

public interface Call extends Cloneable {
    
    Request request();
    
    Response execute() throws IOException;
    
    void enqueue(Callback responseCallback);
    
    void cancel();
    
    boolean isExecuted();

    boolean isCanceled();
    
    Timeout timeout();
    
    Call clone();

    interface Factory {
        Call newCall(Request request);
    }
}
  • 进入OkHttpClient的newCall方法
public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
  • newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
}

newCall方法获得Call实际是RealCall,RealCall就是准备执行的请求,是对接口Call的实现,其内部持有OkHttpClient实例,Request实例。并且这里还创建了Transmitter给RealCall的transmitter赋值。

Transmitter类

Transmitter意为发射器,是应用层和网络层的桥梁。在进行连接、真正发出请求和读取响应中起到很重要的作用。

public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}

Transmitter内部持有OkHttpClient、连接池、call、事件监听器。

Dispatcher类

Dispatcher类负责异步任务的请求策略。

public final class Dispatcher {
  private int maxRequests = 64;
  //每个主机的最大请求数,如果超过这个数,新的请求会被加到readyAsyncCalls队列中
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;
  
  //任务队列线程池
  private @Nullable ExecutorService executorService;
  //待执行异步任务队例
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //运行中的异步任务队例
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //运行中同步任务队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

同步请求执行流程

client.newCall(request).execute(),execute方法在Call的实现类RealCall中。

public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();//超时计时开始
    transmitter.callStart();//回调监听器的请求开始
    try {
      client.dispatcher().executed(this);//放入队列
      return getResponseWithInterceptorChain();//执行请求获取结果
    } finally {
      client.dispatcher().finished(this);//请求结束
    }
}

首先判断 如果已经执行,就会抛出异常。这就是一个请求只能执行一次的原因。然后回调请求监听器的请求开始。然后调用client的调度器Dispatcher的executed方法。

  • dispatcher().executed()
synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

请求放入一个双端队列runningSyncCalls中,表示正在执行的同步请求。

然后返回了getResponseWithInterceptorChain()的结果Response,同步请求真正的请求流程是在getResponseWithInterceptorChain方法中(详情见下节)。
最后请求结束,会走Dispatcher的finished(Deque calls, T call)方法。

  • dispatcher.finished()
void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }
    
    boolean isRunning = promoteAndExecute();
    
    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
}

这里将call从同步队列中移除,并且调用了promoteAndExecute()方法,这个方法在后面讲述。

异步请求执行流程

  • 异步方法equeue()
@Override 
public void enqueue(Callback responseCallback) {
synchronized (this) {
  //设置exexuted参数为true,表示不可以执行两次
 if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
}
transmitter.callStart();//回调请求监听器的请求开始
//传入一个新的对象AsyncCall
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
  • AsyncCall类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
  super("OkHttp %s", redactedUrl());
  this.responseCallback = responseCallback;
}

String host() {
  return originalRequest.url().host();
}

Request request() {
  return originalRequest;
}

RealCall get() {
  return RealCall.this;
}

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    //执行耗时的IO操作
    //获取拦截器链,详见下篇文章
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
          
    //回调,注意这里回调是在线程池中,而不是向当前的主线程回调
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      //回调,同上
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);
      //回调,同上
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    
    client.dispatcher().finished(this);
  }
}
}

AsyncCall继承NamedRunnable,NamedRunnable实现自Runnable,即AsyncCall就是个Runnable,它是会在线程或线程池中执行run方法的。

  • NamedRunnable类
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //execute抽象方法,在AsyncCall中有具体实现
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

在分析同步的时候知道Dispatcher调度器负责异步请求策略,去看看equeue方法。

  • Dispatcher.equeue()
void enqueue(AsyncCall call) {
synchronized (this) {
      readyAsyncCalls.add(call);
    
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      //相同host请求,共用一个调用技术
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
}

//从runningAsyncCalls和readyAsyncCalls找到相同的host请求
private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
}
  • promoteAndExecute()
//调度的核心方法:在控制异步并发的策略基础上,使用线程池 执行异步请求
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
    
        if (runningAsyncCalls.size() >= maxRequests) break; //最大并发数64.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host最大并发数.
    
        i.remove();//从等待队列中移除
        //host并发数+1
        asyncCall.callsPerHost().incrementAndGet();
        //加入可执行请求的集合
        executableCalls.add(asyncCall);
        //加入正在执行的异步请求队列
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //可执行的请求
      asyncCall.executeOn(executorService());
    }
    
    return isRunning;
}

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
}

遍历readyAsyncCalls,先进行两个检查:

  1. 正在执行异步请求runningAsyncCalls数量大于最大并发请求数64就break;
  2. 相同host请求的数量大于5,就continue。

如果检查都通过,就从等待队列中移除,callPerHost自增1,放入可执行的集合executableCalls,并添加到队列runningAsyncCalls中,表示正在执行的异步请求。

这里的异步请求等待队列,是为了控制最大并发数的缓冲,异步请求并发数达到64、相同host的异步请求达到5,都要放入等待队列。

  • AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    //在线程池中执行asyncCall
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);//回调失败
  } finally {
    if (!success) {
      client.dispatcher().finished(this); //执行发生异常 结束
    }
  }
}

AsyncCall的run方法会走到execute()方法,在上面有展示。

下面总结一下请求的流程:

  • 同步请求
  1. 调用client.newCall(request).execute()方法,也就是RealCall的execute方法;
  2. execute方法内部调用client.dispatcher().executed()方法,将当前RealCall加入到runningSyncCalls队列;
  3. 使用getResponseWithInterceptorChain()获取结果;
  4. 最后调用Dispatcher的finish方法结束请求。
  • 异步请求
  1. 调用client.newCall(request).equeue()方法,其内部调用client.dispatcher().enqueue(new AsyncCall(responseCallback))方法;
  2. 先将AsyncCall加入到当前readyAsyncCalls队列中,在找到执行当前主机的AsyncCall,一个主机用同一个AsyncCall;
  3. 使用promoteAndExecute()方法在控制异步并发的策略基础上使用线程池执行异步请求(并发控制有包括最大并发数64,host最大并发数5)。异步请求的执行也是使用getResponseWithInterceptorChain(),获得结果后回调出去。最后调用Dispatcher的finish方法结束请求。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352