OkHttp源码阅读(二)—— Dispatcher任务调度器

  上边OkHttp源码阅读(OkHttp源码阅读(一)-——初识OkHttp)中,初步的了解了一个简单http请求的执行流程,其中的一个关键角色就是Dispatcher,它主要负责请求的分发工作,下面具体分析一下它

同步任务分发

还记得上篇博客留下的坑嘛?那个标记异常醒目的重要代码块,如图:

1.jpeg

我们已经知道同步请求实际上调用的是RealCall类中的execute方法,如上图中所示的代码是execute方法执行的主要部分,标记①中调用Dispatcher的execute方法将构建出来的Realcall对象传入进去从而触发同步执行,Dispatcher的execute方法调用其实很简单,如下:

 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

Dispatcher的execute方法只是将请求call添加到了runningSyncCalls中,这里有了一个新的角色runningSyncCalls,从字面上理解就是正在运行的同步请求组,实际上,它是一个队列,正在执行的同步请求队列,我们看一下它的初始化,也就是Dispatcher中runningSyncCalls的定义。

/** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  /**注意上边的注释,包括取消的但是还没结束的请求call*/
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  /**注意上边的注释,包括取消的但是还没结束的请求call*/
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

在Dispatcher中定义了三个队列,分别是

  1. readyAsyncCalls 准备执行的异步请求队列
  2. runningAsyncCalls 正在执行的异步请求队列
  3. runningSyncCalls 正在执行的同步请求队列

他们的定义都是使用Deque的方式,Deque(双端队列)的特性是集成了Queue(队列)和Stack(栈)的特性,具体Deque的详细介绍可以另一篇博客Java容器之双端队列Deque, 接下来执行Response result = getResponseWithInterceptorChain();获取响应response,从源码中可以看到从Request请求发出到获取Response,所有的执行过程都是在主线程执行,所以同步请求会阻塞主线程,我们之前说过Call是链接Request和Response的一个中间桥梁,具体请求建立链接、参数传递等等包括获取response的过程交给了连接器链InterceptorChain,暂且不赘述,后边详细说明,获取到Response后,还有一个最最最重要的操作步骤 就是finally代码块,也就是标记③,调用Dispatcher的finished方法,具体源码实现如下:

/** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      /**首先从队列中移除这个请求*/
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      /**promoteCalls是个标记位,同步请求不会调用,异步请求会调用promoteCalls方法进行队列重新整理*/
      if (promoteCalls) promoteCalls();
      /**获取正在运行任务数*/
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    /**一下是闲置回调*/
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  同步请求过程中的finished方法中,首先将runningSyncCalls正在执行的同步队列中的元素移除,然后计算正在运行的任务数runningCallsCount,只有当正在执行的任务数为0的时候才会回调idleCallback,否则Dispachter还会继续执行runningSyncCalls同步请求队列中的其他任务或者是其他等待运行或者正在运行的异步任务, runningCallsCount()方法很简单,就是判断下当前正在执行的异步队列和同步队列的size之和。

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }

异步任务分发

  在实际开发过程中,大部分的网络请求都是异步请求,在OKHttp中,异步任务的分发流程也比同步的要复杂些,首先还是直接从RealCall的enqueue()方法开始:

2.jpeg

同样调用的是Dispatcher的enqueue方法,与execute方法不同的是,enqueue方法传入的是AsyncCall的对象,跟踪源码可以看到AsyncCall实际上是一个Runnable对象,

final class AsyncCall extends NamedRunnable 
**
**
***
**

public abstract class NamedRunnable implements Runnable

接下来看一下Dispatcher的enqueue()方法具体执行流程:

synchronized void enqueue(AsyncCall call) {
    /**当前运行的异步任务数小于最大请求数,并且每个主机的请求数小于每个主机最大请求数*/
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      /**将改请求任务放入到正在执行的任务队列中*/
      runningAsyncCalls.add(call);
      /**线程池执行该任务*/
      executorService().execute(call);
    } else {
      /**如果不满足上边条件,则该任务会被加入到缓存队列,也就是准备执行的任务队列*/
      readyAsyncCalls.add(call);
    }
  }

enqueue方法对runningAsyncCalls和readyAsyncCalls队列进行的操作,由于两个队列都非线程安全队列,所以enqueue方法被加上了synchronized,首先判断条件当前运行的异步任务数小于最大请求数,并且每个主机的请求数小于每个主机最大请求数时,该任务会直接加入到正在执行的异步任务队列runningAsyncCalls中,否则该任务会暂时被加入到缓存队列readyAsyncCalls中,Dispatcher中默认最大请求数 maxRequests是64个,每个主机最大请求数 maxRequestsPerHost是5个 ,另外Dispatcher还维护了一个线程池ExecutorService,并通过上面executorService()进行初始化:

  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  *
  *
  *
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

其中在ThreadPoolExecutor在初始化时,将线程池的最大线程个数设置为Integer.MAX_VALUE,理论上说正在运行的请求线程是Integer.MAX_VALUE个,但是Dispatcher中做了默认最大请求数maxRequests的限制,所以在用户没有特殊指定最大请求数时,Okhttp同时运行的请求个数不会超过64个。

AsyncCall

  之前提到AsyncCall是个Runnable,上述异步任务执行过程中,当ExecutorService执行execute()方法时就是调用AsyncCall的run方法,由于AsyncCall继承NamedRunnable关系,实际上调用的是AsyncCall的execute方法,如下:

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    *
    *
    *
    
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        /**通过拦截器链获取Response*/
        Response response = getResponseWithInterceptorChain();
        /**重试和重定向拦截器是否被取消*/
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        /**以下代码块才是重点*/
        client.dispatcher().finished(this);
      }
    }
  }

上边AsyncCall源码可以看出还是通过拦截器链获取Response,然后通过一些判断返回回调,这里拦截器链后面会详细介绍,重点是还是finally代码块,同步请求中Dispatcher的finished方法和异步的有所差别。

/** Used by {@code AsyncCall#run} to signal completion. */
/**异步请求调用此方法*/
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      /**首先从队列中移除这个请求*/
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      /**promoteCalls是个标记位,同步请求不会调用,异步请求会调用promoteCalls方法进行队列重新整理*/
      if (promoteCalls) promoteCalls();
      /**获取正在运行任务数*/
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    /**一下是闲置回调*/
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

异步请求调用了promoteCalls标记位为true的方法,前面的队列remove方法不变,重点在promoteCalls()方法,这个方法Dispatcher会重新整理缓存队列和正在执行的任务队列,如下:

private void promoteCalls() {
    /**正在运行的任务数越界*/
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    /**缓存队列中没有待执行任务了*/
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      /**遍历缓存队列中等待执行的任务,判断条件是每台主机最大请求数小于最大限制*/
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        /**满足上边条件的任务会从缓存队列中取出,并放入到正在执行的任务队列中,然后线程池执行该任务*/
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
      /**每次正在执行任务队列在添加时都要判断是否已经超过指定最大请求任务数*/
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

可以看出,异步任务在调用Dispachter的finished()方法时,会循环的获取缓存队列中的任务,只要有任务并满足限制条件就会被执行(AsyncCall.execute()方法),然而AsyncCall.execute()方法执行完后finally又会执行Dispachter的finished()方法,如此循环调用直到正在执行的任务队列runningAsyncCalls没有任务为止,最后抛出idleCallback回调,处于闲置状态! 缓存队列和正在执行的任务队列二者之间的关系有点像生产者和消费者模式, Dispachter负责生产 ExecutorService负责消费

总结

  同步和异步请求区别除了Call调用的方法不同以外,Dispatcher的处理机制也不相同,同步请求时Dispatcher会直接将这个任务丢进正在执行的同步任务队列中,直到获取到Response后,再从队列中移除该任务,所以同步请求会阻塞当前线程。异步任务请求时Dispatcher会判断当前的执行任务条件,选择放入缓存任务队列还是正在执行的异步任务队列,通过自己维护的线程池执行异步任务,得到Response,执行完成以后循环判断缓存队列和执行队列任务数,最终将所有任务执行完成,由于执行任务和获取Response都是在子线程中完成,所以异步任务不会阻塞当前线程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352