OkHttp源码解析 -- 同步异步处理

前言:

使用OkHttp,执行网络请求时会有异步还是同步处理。先说下异步和同步的区别,同步并不是指在UI线程中执行,简单的一句话就可说明,同步是会阻塞当前线程,异步是指重新开了一个线程,并在开辟的线程中执行,执行完成后再回调到原始线程。
那么OkHttp是怎么完成异步和同步的处理的了?接下来的来一步一步的解析。

1、同步请求

// 调用代码
Response response = call.execute();

call的实现类是RealCall,这是一个准备进行网络请求的执行类,他不能被执行两次

// 同步的请求方法
@Override public Response execute() throws IOException {
    // 代码块 executed参数就是表示这个RealCall是否执行过了,验证了它不能执行两次
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
   // 设置一些接口回调,比如callStart,这个回调的注册在OkHttpClient的构建中
    eventListener.callStart(this);
    try {
      // 调用OkHttpClient的调度器插入到http请求队列中
      client.dispatcher().executed(this);
      // 开始执行网络请求的责任链来开始真正的网络请求,得到Response响应后就可以返回了
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      // 错误的回调
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      // 因为是插入到队列中,所以执行完成后就进行善后处理了
      client.dispatcher().finished(this);
    }
  }

在执行execute方法过程中,遇到了三个方向:
1、client.dispatcher().executed(this);
2、 Response result = getResponseWithInterceptorChain()// 网路请求责任链后面一章分析
3、 client.dispatcher().finished(this);

client.dispatcher()是什么东西,什么时候遇到过?
构建OkHttpClient的时候就对dispatcher赋值了,对于一个OkHttpClient来说,Dispatcher也就是一个

  dispatcher = new Dispatcher();

看上述第一个方法
Dispatcher执行execute方法很简单,就是讲该RealCall对象入队,不会有其它的处理

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // 先进先出的队形,以及执行方式
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

继续看第三个方法:

  // 从队列拿出来
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

// 参数依次是插入的同步队列,该RealCall,和时候需要重新整理队列的标志对于同步请求不需要
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    // 安全加锁
    synchronized (this) {
      // 移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // 整理队列,异步的时候才需要,同步不需要
      if (promoteCalls) promoteCalls();
      // 更新运行的RealCall的数量,包括同步和异步
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    // 最后一个Http请求完成后,并且设置了idleCallback 回调
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

关于同步的请求很简单,简单来说就是插入队列,进行Http请求,请求回调后清除队列中的请求,返回响应体,就结束了。当然http请求并没分析,只解析了整体的调度过程。

2、异步请求

异步请求的调度方式,传入一个接口实例

call.enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {

                }

                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    
                }
            });

同样的,我们来看下RealCall中的enqueue方法
what?比同步的还少?不存在的,这里看的少,只不过被封装起来了

  @Override public void enqueue(Callback responseCallback) {
    // 同样的同一个RealCall不能执行两次,可以通过clone()方法获取新的对象
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    // 直接交给Dispatcher管理了,尽然撒手不管了
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

首先看下Dispatcher中的enqueue方法

 //  这个call是个Runnable,我们将传入的Callback封装在AsyncCall里面
synchronized void enqueue(AsyncCall call) {
    // 是否可以执行?条件就是是否到了最大的运行数量。是否到了同一个主机的最大运行数量
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 加入到运行队列中
      runningAsyncCalls.add(call);
      // 通过线程池来执行Runnable
      executorService().execute(call);
    } else {
     // 条件不满足,那就只能等待了,放在异步等待队列中,同步的话就不存在,直接放行
      readyAsyncCalls.add(call);
    }
  }

为了解释清楚,需要了解Dispatcher的成员变量,不了解还玩个啥。

首先Dispatcher这个类是final,不可以继承重写的

  // http最大的请求
  private int maxRequests = 64;
  // 每个主机的最大http请求
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  // 懒加载创建线程池
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  // 准备的异步队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在运行的异步队列(包含了已经取消但是还没执行完成的请求)
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  // 正在运行的同步队列(提醒,这里面保存的RealCall实例,异步的保存的是Runnable)(包含了已经取消但是还没执行完成的请求
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

// synchronized 同步
public synchronized ExecutorService executorService() {
    if (executorService == null) {
      /**
        * 核心线程为0,为了不消耗资源,因为核心线程会一直执行下去
        * 最大线程数Integer.MAX_VALUE,其实达不到,maxRequests 
        *   60秒之后没有利用就销毁了
        *  线程等待队列
        * 线程工厂
        **/
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

回过头看看怎么对传入的Callback封装的,AsyncCall就是一个Runnable,只有新开辟一个线程才叫异步,不然怎么可能叫异步了
为了简化代码量,直截取了Runnable中的run方法中的核心代码 <该成功失败的回调都是在子线程中>

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        // 网络请求
        Response response = getResponseWithInterceptorChain();
       // 如果取消了 那就直接返回错误
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
         // 回调成功
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          // 遇到了IO的异常也回调错误
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
       // 最后清除
        client.dispatcher().finished(this);
      }
    }

废话不多说,直接看看 client.dispatcher().finished(this);这个会和同步的有什么区别了?这个this代表当前Runnable
贴出代码

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    // 运行的异步队列,Runnable,需要重新调整队列
    finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // 重点分析这个方法,他解决了什么时候需要出发从准备队列放到执行队列中,并触发执行的契机。最好的契机就是有一个执行完了,才可能会需要处理,这就是promoteCalls的意义
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  // 对队列的处理
  private void promoteCalls() {
    // 基本条件不满足 返回
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
    // 对准备的异步队列进行遍历
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      
     // 满足的条件(该主机的http请求个数没有达到最大值maxRequestsPerHost = 5)
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        // 从准备队列删除,并插入到运行队列中,并放在线程池中执行
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

总结:到此我们就解析完了异步和同步的调度过程了。看下面盗用的调度过程


调度过程

其实Dispatcher这个类的处理过程就是生产者和消费者模式。保存了准备队列和执行队列。
同时对数据进行操作的时候都是加上了synchronized关键字。
内部通过维护线程池处理减少资源的浪费,更加高效。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容