okHttp源码学习(一)

1.okHttp使用流程分析

      // 1.创建OkHttpClient对象
        OkHttpClient client = new OkHttpClient.Builder().build();
        // 2.创建Request对象
        Request request = new Request.Builder().build();
        // 3.创建请求对象
        Call call = client.newCall(request);
        // 4.同步请求
        try {
            // 同步返回结果
            Response execute = call.execute();
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 4.异步使用 异步返回结果
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
  • okhttp请求发起流程分析
    1.同步请求
    执行call.execute()方法,实际上会执行到RealCall的execute方法,方法所示:
 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      //此处的代码是将请求加入到okhttp的同步请求队列中
      client.dispatcher().executed(this);
      //此处代码是真正的发出去同步请求并返回结果,这里涉及到okhttp的拦截器,下面会细讲
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

2.异步请求
执行 call.enqueue()方法实际上会执行到RealCall的enqueue方法

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    // 获取okhttp请求调度器调用异步请求,AsyncCall方法是一个实现Runnable的类
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

AsyncCall继承NamedRunnable,NamedRunnable实现了Runnable,rRunnable的run方法会执行到AsyncCall的execute方法

 final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }
    // 实际是Runnable的run方法
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
         //  进入拦截器,发起请求获取请求结果
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          // 请求失败回调
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          // 请求成功回调
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

2.okHttp使用的设计模式分析

  • Builder设计模式
    okHttpClient的Builder类,,提供默认配置参数
  public Builder() {
      dispatcher = new Dispatcher(); // 默认请求调度器
      protocols = DEFAULT_PROTOCOLS; // 默认请求协议 http/1.1 和http/2
      connectionSpecs = DEFAULT_CONNECTION_SPECS; // 提供默认TLS 连接
      eventListenerFactory = EventListener.factory(EventListener.NONE); //指标事件监听器,可以监控HTTP 呼叫的数量、大小和持续时间
      proxySelector = ProxySelector.getDefault(); // 默认代理服务器
      cookieJar = CookieJar.NO_COOKIES; // 默认cookie实现
      socketFactory = SocketFactory.getDefault(); // 默认socket创建工厂
      hostnameVerifier = OkHostnameVerifier.INSTANCE; // 默认主机名验证器  可以进行证书校验
      certificatePinner = CertificatePinner.DEFAULT; // 加密证书之类的 俺也不太懂
      proxyAuthenticator = Authenticator.NONE; // 代理请求认证 空实现
      authenticator = Authenticator.NONE; //请求认证 空实现
      connectionPool = new ConnectionPool(); // 默认连接池 应该socket连接的连接池
      dns = Dns.SYSTEM; // 默认dns解析
      followSslRedirects = true;  // ssl重定向
      followRedirects = true; // 默认开启重定向
      retryOnConnectionFailure = true; // 失败重试
      connectTimeout = 10_000;  // 连接超时
      readTimeout = 10_000;  // 读取超时
      writeTimeout = 10_000; // 写入超时
      pingInterval = 0; // 心跳间隔
    }

Request类的Builder类,提供默认请求方式

public Builder() {
      this.method = "GET"; //默认get请求
      this.headers = new Headers.Builder();
    }
  • 责任链设计模式
    前边已经介绍过,这里是okhttp发起请求的地方,从这里开始通过责任链模式走okHttp的所有拦截器
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors()); // 加载用户自定义拦截器
    interceptors.add(retryAndFollowUpInterceptor); // 重试拦截器
    interceptors.add(new BridgeInterceptor(client.cookieJar())); // 桥接拦截器
    interceptors.add(new CacheInterceptor(client.internalCache())); // 缓存拦截器
    interceptors.add(new ConnectInterceptor(client)); // 连接拦截器
    if (!forWebSocket) {  //如果不是webSocket协议 添加
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket)); // 请求拦截器 最后的一个拦截器
    // 构建责任链对象
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    // 进入责任链
    return chain.proceed(originalRequest);
  }
}

通过上述代码,最终会进入到RealInterceptorChain的proceed方法

 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    // 构建拦截器责任链的下一个节点
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    // 获取当前拦截器
    Interceptor interceptor = interceptors.get(index);
    // 执行当前拦截器 注意这里的参数是next,也就是说是下一个责任链上的节点
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

此处Response response = interceptor.intercept(next);会执行拦截器的intercept方法,以第一个拦截器RetryAndFollowUpInterceptor为例,查看此拦截器的intercept方法

 @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
        //... 此处省略部分代码
      try {
        // 执行责任链节点的处理方法  记得上边的拦截器中传入的下一个节点的对象,所以这里就调到下一个节点的proceed方法
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
         //... 此处省略部分代码
      priorResponse = response;
    }
  }

这样责任链的每一个节点都会先执行拦截的intercept方法,然后执行下一个节点的proceed的方法,直到执行到最后一个拦截器
CallServerInterceptor的intercept方法,会发起真正的请求,拿到响应体,依次返回结果。(具体请求过程待补充)

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    // ...省略部分代码
    Request request = realChain.request();
      if (responseBuilder == null) {
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    // ...省略部分代码
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }
    // ...省略部分代码
    httpCodec.finishRequest();
    // 构建返回体对象  真正返回,这里采用okIo进行网络请求,后期有时间补充对应的请求过程
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      responseBuilder = httpCodec.readResponseHeaders(false);
      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
      code = response.code();
    }
    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);
    if (forWebSocket && code == 101) {
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
    return response;
  }

3.okHttp的线程调度器(Dispatcher)分析

从上边的请求过程可以看到,不管同步请求或者异步请求,都会调用okHttpClient的dispatcher()来操作,

  //同步请求
 client.dispatcher().executed(this);
  // 异步请求
 client.dispatcher().enqueue(new AsyncCall(responseCallback));

Dispatcher调度器的所有成员变量

  private int maxRequests = 64;  //同时最大请求数量
  private int maxRequestsPerHost = 5; // 同时最大同一域名的请求数量
  private @Nullable Runnable idleCallback;  // 调度器空闲状态的回调

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService; // 请求线程池

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();  // 异步请求等待队列

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();  // 异步请求运行队列

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();  // 同步请求运行队列

同步请求队列分析,从RealCall的同步请求方法开始

 @Override public Response execute() throws IOException {
    synchronized (this) {
    // ...省略部分代码
    try {
      client.dispatcher().executed(this);  // 加载同步请求队列
      Response result = getResponseWithInterceptorChain(); //获取请求结果
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this); //完成同步请求
    }
  }

异步请求队列分析,从RealCall的异步请求enqueue()方法开始分析

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
    // ...省略部分代码
    client.dispatcher().enqueue(new AsyncCall(responseCallback)); // 加入异步请求队列  
  }

具体实现,看Dispatcher的enqueue方法

synchronized void enqueue(AsyncCall call) {
      // 如果正在运行异步请求数量小于最大请求数量限制 
      //并且 同一域名的请求数量小于maxRequestsPerHost,将请求加入运行中队列,
     // 并直接加入的线程池执行,否则添加到等待队列
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call); //加入异步请求运行中队列
      executorService().execute(call); // 执行异步请求
    } else {
      readyAsyncCalls.add(call); // 加入异步请求等待队列
    }
  }

异步请求就会在线程中执行Runnable的run方法,也就会执行到AsyncCall 的execute方法,这样就可以正在发起请求获取响应结果。

  final class AsyncCall extends NamedRunnable {
  // ...省略部分代码
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain(); //获取请求结果
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        // ...省略部分代码
      } finally {
        client.dispatcher().finished(this); //请求结束
      }
    }
  }

不管是同步请求还是异步请求,最后都会执行到 client.dispatcher().finished(this); 这句代码,我们看一下具体的实现

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      //执行结束,将请求从队列中移除
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // promoteCalls 同步请求为false,异步请求为true,调用promoteCalls来判断是否可以将等待队列的请求放到运行队列
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount(); //获取请求的数量
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {  //如果请求队列中没有数据,也就是说Dispatcher处于空闲状态,调用idleCallback的run方法
      idleCallback.run();
    }
  }

promoteCalls()方法的实现

private void promoteCalls() {
    //运行队列的数量是否大于最大请求数量
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    // 等到队列是否为空
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    // 迭代等待队列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      // 判定同一域名的请求是否消息最大数量
      if (runningCallsForHost(call) < maxRequestsPerHost) {
      // 移除等待队列数据
        i.remove();
      //加入运行中队列
        runningAsyncCalls.add(call);
        // 加入线程池
        executorService().execute(call);
      }
      // 这句代码个人感觉冗余了
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
  • Dispatcher线程池的配置
 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      // 第一个参数 核心线程数
      // 第二个参数  最大线程数
      // 第三个参数  线程保活等到时间
     // 第四个参数 单位秒
     // 第五个参数 线程阻塞队列
     // 第六个参数 线程构造工厂
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

1.线程池的核心线程数为什么设置成0,最小线程数设置成最大值
由于okHttp已经自己利用readyAsyncCallsrunningAsyncCalls管理请求队列和等待队列,没必要再通过线程池来控制了
2.采用SynchronousQueue来实现等待队列
SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列,但是 isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容