简述OkHttp3.0

peitu.jpg

本文主要以源码形式解读OkHttp内部实现,源码基于okhttp:3.10.0。

同步请求

异步请求的例子

先看一个异步请求的例子:

//(1) builder模式配置参数构建request对象
 Request request = new Request.Builder()
                .url("http://baidu.com?key=values")
                .get()
                .build();
            //(2)builder构建OkHttpClient对象
        new OkHttpClient.Builder().build()
                .newCall(request)//(3)request入参返回RealCall
                .enqueue(new okhttp3.Callback() {//(4)请求回调
                    @Override
                    public void onFailure(Call call, IOException e) {
                        System.out.println(e.getMessage());
                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        System.out.println("thread:"+Thread.currentThread());
                        if (response.isSuccessful()) {
                            ResponseBody responseBody = response.body();
                            data.setText(responseBody.string());
                        }

                    }
                });

上面是一个OkHttp异步请求的代码,先构建一个Request对象设置请求地址、请求方式、header以及非GET请求还可设置body,然后创建OkHttpClient对象调用newCall设置request对象得到RealCall,RealCall调用enqueue发起异步请求并设置请求回调完成了一个简单的异步请求,OkHttpClient在实际开发中需要单例,原因会在后面的内容中有答案。

(1)接下来先看下Request内部:

/**
 * An HTTP request. Instances of this class are immutable if their {@link #body} is null or itself
 * immutable.
 */
public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Object tag;

  private volatile CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }
    /**
     * Attaches {@code tag} to the request. It can be used later to cancel the request. If the tag
     * is unspecified or null, the request is canceled by using the request itself as the tag.
     */
    public Builder tag(Object tag) {
      this.tag = tag;
      return this;
    }

    public Request build() {
      if (url == null) throw new IllegalStateException("url == null");
      return new Request(this);
    }
  }
}

Request包含请求的参数url请求地址、method请求方法、header请求头数据、请求body以及tag标签。

(2)OkHttpClient主要暴露给外部调用,OkHttpClient对象的创建也通过builder模式,这里主要关注它的构造方法:

OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;//(1)分发器
    this.proxy = builder.proxy;//(2)代理类
    this.protocols = builder.protocols;//(3)协议类
    this.connectionSpecs = builder.connectionSpecs;//(4)连接规模 确定TLS版本和密码套件
    this.interceptors = Util.immutableList(builder.interceptors);//(5)自定义应用拦截器
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);//(6)自定义网络拦截器
    this.eventListenerFactory = builder.eventListenerFactory;//(7)事件监听工厂
    this.proxySelector = builder.proxySelector;//(8)代理选择器
    this.cookieJar = builder.cookieJar;
    this.cache = builder.cache;//(9)缓冲类
    this.internalCache = builder.internalCache;//(10)
    this.socketFactory = builder.socketFactory;//(11)socket工厂
    
     boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
      this.certificateChainCleaner = builder.certificateChainCleaner;
    } else {
      X509TrustManager trustManager = systemDefaultTrustManager();
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    this.hostnameVerifier = builder.hostnameVerifier;
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);
    this.proxyAuthenticator = builder.proxyAuthenticator;
    this.authenticator = builder.authenticator;
    this.connectionPool = builder.connectionPool;//(12)连接池
    this.dns = builder.dns;
    this.followSslRedirects = builder.followSslRedirects;
    this.followRedirects = builder.followRedirects;
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
    this.connectTimeout = builder.connectTimeout;
    this.readTimeout = builder.readTimeout;
    this.writeTimeout = builder.writeTimeout;
  }

(12)OkHttpClient中创建了连接池,还维护了线程池(dispatcher中)和响应缓冲,所以在使用过程中要用单例。

(3)接着调用了OkHttpClient.newCall(req):

 /**
   * Prepares the {@code request} to be executed at some point in the future.
   */
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

newCall方法内部调用RealCall.newRealCall方法并返回Call对象:

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);//(1)
    call.eventListener = client.eventListenerFactory().create(call);//(2)
    return call;
  }

(1)创建RealCall对象;RealCall对象持有OkHttpClient和Request,构造方法中还创建了重试/重定向拦截器RetryAndFollowInterceptor;
(2)从OkHttpClient得到evenListener对象,

(4)调用异步请求RealCall.enqueue方法:

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");(1)
      executed = true;
    }
    captureCallStackTrace();(2)
    eventListener.callStart(this);(3)
    client.dispatcher().enqueue(new AsyncCall(responseCallback));//(4)
  }

(1)如果call已经被执行,抛出异常;
(2) 捕获RealCall类的栈轨迹;
(3)触发监听方法callStart(),表示请求开始;
(4)执行dispatcher分发器enqueue方法,创建了AsyncCall类,AsyncCall传入responseCallback。

下面查看dispatcher的enqueue方法:

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//(1)
      runningAsyncCalls.add(call);//(2)
      executorService().execute(call);//(3)
    } else {
      readyAsyncCalls.add(call);//(4)
    }
  }

(1)把call添加到正在运行的队列的判断依据:如果正在执行的异步请求数小于最大请求数(默认64),并且同一个主机执行的异步请求小于单个主机运行的最大请求数(默认5)否则添加到准备队列;
(2)根据(1)把call添加到正在运行的队列;
(3)将call交线程池执行;
(4)不满足(1)把call添加到准备队列。
AsyncCall是Runnable实现类,execute方法完成请求和返回的执行。
AsyncCall. execute():

 @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();//(1)
        if (retryAndFollowUpInterceptor.isCanceled()) {//(2)
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);//(3)
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);//(4)
          responseCallback.onFailure(RealCall.this, e);//(5)
        }
      } finally {
        client.dispatcher().finished(this);//(6)
      }
    }

(1)getResponseWithInterceptorChain方法得到response对象,getResponseWithInterceptorChain是核心实现,后边专门展开说明;
(2)请求如果取消返回,调用responseCallback.onFailure通知处理请求失败;
(3)否则正常请求返回,调用responseCallback.onResponse返回response对象,调用方就可以拿到请求的数据返回,做具体业务处理;
(4)(5)回调执行eventListener.callFailed和responseCallback.onFailure;
(6)dispatcher执行finished方法,finish内部会调用promoteCalls方法从readyAsyncCalls队列中取出call 添加到runningAsyncCalls中,executorService().execute(call)加入线程池中执行call。添加到runningAsyncCalls中的条件是小于运行runningAsyncCalls最大call数并且同一主机call数小于maxRequestsPerHost(即同一主机最大请求数)。

  • 接下来分析核心方法getResponseWithInterceptorChain():
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//(1)
    interceptors.add(retryAndFollowUpInterceptor);//(2)
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//(3)
    interceptors.add(new CacheInterceptor(client.internalCache()));//(4)
    interceptors.add(new ConnectInterceptor(client));//(5)
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());//(6)
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//(7)

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);//(8)
  }

(1)构建全部拦截器list,先添加自定义应用层拦截器;
(2)添加重试/重定向拦截器;
(3)添加桥接拦截器;
(4)添加缓冲拦截器;
(5)添加连接拦截器;
(6)如果不是websocket,添加自定义网络拦截器;
(7)添加请求服务拦截器;
(8)传入拦截器list,请求,call对象,事件监听,连接超时时间以及读写超时时间生成Interceptor.Chain链对象,执行chain.proceed(originalRequest)。

chain.proceed(originalRequest):

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);//(1)

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

(1)创建下一个RealInterceptorChain,将chain传入interceptor.intercept方法,intercept中会执行nextchain.proceed方法,然后再继续创建下一个RealInterceptorChain,intercept再执行下一个nextchain.proceed方法,这样循环调用所有拦截器,到最后一个拦截器CallServerInterceptor停止遍历,返回response,遍历循环流程如下图:

get_response_with_interceptor_chain.jpg

接下来阅读自带的拦截器代码:

  • RetryAndFollowUpInterceptor:
  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);//(1)
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();//(2)
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);//(3)
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();//(4)
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;//(5)
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {//(6)
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {//(7)
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp = followUpRequest(response, streamAllocation.route());//(8)

      if (followUp == null) {//(9)
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());//(10)

      if (++followUpCount > MAX_FOLLOW_UPS) {//(11)
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {//(12)
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {//(13)
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {//(14)
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;//(15)
      priorResponse = response;//(16)
    }
  }

(1)StreamAllocation用来协调连接(Connections)、流(Screams)和请求队列(Calls);
(2)如果请求取消,调用screamAllocation.release()。release方法会关闭socket,并回调 eventListener.connectionReleased。
(3)执行realChain.proceed方法,proceed内部会创建下一个chain,再传入下一个拦截器,拦截器intercept继续执行负责的工作,并调用chain.proceed()。
(4)如果realChain.proceed方法抛出RouteException,那么调用recover方法,recover方法返回false则不会重试连接,抛出IOException异常,异常会在Call.execute方法中捕获执行eventListener.callFailed和onFailure方法,返回false的条件如下:

1.应用层禁止重试 ;
2.定义了不可重复发送的请求body ;
3.捕获的异常严重等级属于致命 ;
4.没有更多的路由可重意重试;

如果上述的四种场景,请求会被发起重试。

(5)IOException,同样调用recover方法,按照(4)中逻辑判断是否重连;
(6)如果抛出没有catch的异常则执行StreamAllocation.screamFailed()和StreamAllocation.screamFailed;
(7)priorResponse是先前得到的响应数据,如果已经先前响应不为空,response会结合先前响应;
(8)根据响应码确认请求是否需要重定向,返回null表示不需要;
(9)不需要重定向就streamAllocation.release()释放连接并返回response,否则执行下面逻辑;
(10)释放response.body对象;
(11)当前重定向数大于最大可重定向数,则释放连接,抛出异常;
(12)请求不允许重复连接,则释放连接,抛出异常;
(13)检查是否是相同的连接,不是就释放当前连接,重新创建ScreamAllocation;
(14)codec为空抛出异常;
(15)重定向request赋值request,准备执行while循环;
(16)保存当前的response。

  • BridgeInterceptor:
 @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());//(1)
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

桥接拦截器主要功能:
1.将应用码转为网络码;
2.用户请求转为网络请求

  • CacheInterceptor
@Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

//(1)
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {//(2)
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {//(3)
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {//(4)
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());//(5)
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {//(6)
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

//(7)
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);//(8)
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

(1)获取缓冲策略;
(2)缓冲策略不为空,缓冲响应为空时,关闭缓冲策略;
(3)网络被禁止,缓冲不存在时,返回失败;
(4)不需要网络,返回缓冲响应,缓冲生效;
(5)执行chain.proceed方法抛出异常时,关闭缓冲;
(6)有缓冲时根据条件使用缓冲响应;
(7)使用网络响应;
(8)给予本请求缓冲(添加到缓冲中);

  • ConnectInterceptor
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();//(1)

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

(1)获得RealConnection对象,调用下一个chain.proceed。

  • CallServerInterceptor
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);//(1)
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);//(2)
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);//(3)
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

//(4)
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

(1)写入请求头数据;
(2)写入请求body数据;
(3)读取响应头数据;
(4)读取响应body数据。

完整异步请求调用流程:

okhttp_async_request.jpg
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357

推荐阅读更多精彩内容