Okhttp主线流程源码学习笔记

本次学习Okhttp源码是对以下这个版本,从开始请求到请求结束回调请求结果的一这个主要流程

 implementation 'com.squareup.okhttp3:okhttp:3.10.0'

所涉及的核心类为

  • OkhttpClient
  • Request
  • Response
  • Call
  • Callback
Okhttp简单使用如下:
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
        Request request = new Request.Builder()
                .url("http://baidu.com")
                .get()
                .build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.e("test", "fail ---");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.e("test", "success-->" + response.body());
            }
        });

发送请求的主线流程

call.enqueue(Callback responseCallback),call是一个接口它的实现类是RealCall

final class RealCall implements Call {

    ...
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  ...

}

以上这段代码表示不能同时执行相同的请求,否则会抛出"Already Executed"的异常,然后执行到client.dispatcher().enqueue(new AsyncCall(responseCallback)),跟进Dispatcher中

public final class Dispatcher {
  //等待队列的容量
  private int maxRequests = 64;
  //运行队列的容量
  private int maxRequestsPerHost = 5;
  //请求的缓存队列
  private @Nullable ExecutorService executorService;
  //等待队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //异步请求的运行队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //同步请求的运行队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  //初始化缓存队列
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

    //异步请求
  synchronized void enqueue(AsyncCall call) {
    //如果对同一个host地址请求的的运行队列此时长度小于5,则把请求添加进运行队列,立即通过线程池执行,如果不满足条线就先添加到等待队列里
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
}

Dispatcher类中定义了等待队列和同步请求和异步请求的队列,以及等待列表容量64,运行队列长度5,也就是说如果添加一个请求此时运行队列里面不足5个就直接把这个请求放进运行队列直接执行,如果此时运行队列里面有五个则先放在等待队列中,等运行队列里面执行完少于5个的时候再从等待队列里面取去放入运行队列执行请求
其中线程池创建过程如下

 executorService = new ThreadPoolExecutor(0, 
                    Integer.MAX_VALUE, 
                    60,
                    TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(), 
                    Util.threadFactory("OkHttp Dispatcher", false));
  
 public class Util{
     public static ThreadFactory threadFactory(final String name, final boolean daemon) {
    return new ThreadFactory() {
      @Override public Thread newThread(Runnable runnable) {
        Thread result = new Thread(runnable, name);
        result.setDaemon(daemon);
        return result;
      }
    };
 } 

创建了一个核心线程为0,无限容量的子线程,存活时间60s的线程池,并且为线程指定了名字,thread.setDaemon(daemon)设置线程为守护线程,SynchronousQueue是一个没有容量的队列,这里配合核心线程为0,线程容量Int.Max实现了一个接收到请求就直接分配线程(复用或者新创建线程)执行的过程。
再分析线程池执行executorService().execute(call),其中的call是new AsyncCall(responseCallback)创建的,AsyncCalls继承自NamedRunnable,是RealCall的内部类。NamedRunnable实现了Runnable接口

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();

AsyncCall继承NamedRunnable,并且实现了抽象方法execute

final class RealCall implements Call {

    final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    //实现父类定义的抽象类,当父类被线程池执行的时候,这个类也会执行
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //责任链模式实现的各类拦截器
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
}

通过以上过程可以知道,当Dispatcher.enqueue()执行线程池executorService().execute(call)会触发RealCall的内部类AsyncCall执行复写父类抽象的execute方法

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

通过策略模式逐层让拦截器处理response,如果请求失败则回调responseCallback.onFailure(RealCall.this, new IOException("Canceled")),如果请求成功则回调responseCallback.onResponse(RealCall.this, response)。

如上,Http请求的主线流程就完成了。

Okhttp中用到的设计模式

构建者模式

OkHttpClient和Request的创建是构造者模式创建的,构造者模式比较常见不多解释。

OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .build();
 Request request = new Request.Builder()
           .url("http://baidu.com")
           .get()
           .build();

责任链模式

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

责任链模式:为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为型模式。
在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

举个栗子:
员工李华需要请假,向所在部门主管(Manager1)申请,部门主管(Manager1)同意转给上一级主管(Manager2)审批,Manager2同意审批再转给人事主管(Manager3)审批
step 1:创建执行事务的接口IBaseTask.java

interface IBaseTask {
    /**
     * @param task      事务内容
     * @param iBaseTask 下一个任务节点
     */
    public void doAction(String content, IBaseTask iBaseTask);
}

事特批:创建三个责任链成员,也就是三个经理Manager

class Manager1 implements IBaseTask {

    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager1 同意请假 并转交给下一个Manager审批...");
            iBaseTask.doAction(content, iBaseTask);
        } else {

        }
    }
}

class Manager2 implements IBaseTask{
    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager2 同意请假 并转交给下一个Manager审批...");
            iBaseTask.doAction(content, iBaseTask);
        }
    }
}

class Manager3 implements IBaseTask {
    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (content.equals("请假")) {
            System.out.println("Manager3 同意请假 流程结束");
            return;
        }
    }
}

step3:通过TaskManager实现责任链实现

class TaskManager implements IBaseTask {
    private List<IBaseTask> taskList;
    private int index = 0;

    public TaskManager() {
        this.taskList = new ArrayList<>();
    }

    public void addTask(IBaseTask task) {
        if (taskList == null || taskList.isEmpty()) {
            this.taskList = new ArrayList<>();
        }
        taskList.add(task);
    }

    @Override
    public void doAction(String content, IBaseTask iBaseTask) {
        if (taskList.isEmpty()) {
            System.out.println("taskList is empty!");
            return;
        }
        if (index < 0 || index >= taskList.size()) {
            System.out.println("index out of bundle!");
            return;
        }
        IBaseTask task = taskList.get(index);
        index++;
        task.doAction(content, iBaseTask);
    }
}

测试结果

public static void main(String[] args) {
        TaskManager taskManager = new TaskManager();
        taskManager.addTask(new Manager1());
        taskManager.addTask(new Manager2());
        taskManager.addTask(new Manager3());
        taskManager.doAction("请假", taskManager);
    }

执行结果如下

Manager1 同意请假 并转交给下一个Manager审批...
Manager2 同意请假 并转交给下一个Manager审批...
Manager3 同意请假 流程结束
Class transformation time: 0.008817664s for 99 classes or 8.817664E-5s per class

Okhttp的拦截器也是类似如上的这种责任链模式实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容