


HTTP 1.1 默认启用长TCP连接,但所有的请求-响应都是按序进行的(这里的长连接可理解成半双工协议。即便是HTTP1.1引入了管道机制,也是如此)。复用同一个TCP连接期间,即便是通过管道同时发送了多个请求,服务端也是按请求的顺序依次给出响应的;而客户端在未收到之前所发出所有请求的响应之前,将会阻塞后面的请求(排队等待),这称为"队头堵塞"(Head-of-line blocking)。
HTTP2.0复用TCP连接则不同,虽然依然遵循请求-响应模式,但客户端发送多个请求和服务端给出多个响应的顺序不受限制,这样既避免了"队头堵塞",又能更快获取响应。在复用同一个TCP连接时,服务器同时(或先后)收到了A、B两个请求,先回应A请求,但由于处理过程非常耗时,于是就发送A请求已经处理好的部分, 接着回应B请求,完成后,再发送A请求剩下的部分。HTTP2.0长连接可以理解成全双工的协议。
HTTP2.0 使用 多路复用 的技术,多个 stream 可以共用一个 socket 连接。每个 tcp连接都是通过一个 socket 来完成的,socket 对应一个 host 和 port,如果有多个stream(即多个 Request) 都是连接在一个 host 和 port上,那么它们就可以共同使用同一个 socket ,这样做的好处就是 可以减少TCP的一个三次握手的时间。
在OKHttp里面,负责连接的是 RealConnection 。



public final class RealConnection extends Http2Connection.Listener implements Connection {
  private static final String NPE_THROW_WITH_NULL = "throw with null exception";
  private static final int MAX_TUNNEL_ATTEMPTS = 21;
  private final ConnectionPool connectionPool;//连接池
  private final Route route;//路由

  // The fields below are initialized by connect() and never reassigned.
  /** The low-level TCP socket. */
  private Socket rawSocket; //底层socket
   * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
   * {@link #rawSocket} itself if this connection does not use SSL.
  private Socket socket;  //应用层socket
  private Handshake handshake;
  private Protocol protocol;
   // http2的链接
  private Http2Connection http2Connection;
  private BufferedSource source;
  private BufferedSink sink;

  // The fields below track connection state and are guarded by connectionPool.
  //下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理
  /** If true, no new streams can be created on this connection. Once true this is always true. */
  public boolean noNewStreams;
  public int successCount;

   * The maximum number of concurrent streams that can be carried by this connection. If {@code
   * allocations.size() < allocationLimit} then new streams can be created on this connection.
  public int allocationLimit = 1;
  /** Current streams carried by this connection. */
  public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

  /** Nanotime timestamp when {@code allocations.size()} reached zero. */
  public long idleAtNanos = Long.MAX_VALUE;
   public RealConnection(ConnectionPool connectionPool, Route route) {
    this.connectionPool = connectionPool;
    this.route = route;


public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");
     // 线路的选择
    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
    // 连接开始,注意这是个死循环,创建连接成功才会跳出
    while (true) {
      try {
        // 如果要求隧道模式,建立通道连接,通常不是这种
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
           // 一般都走这条逻辑了,实际上很简单就是socket的连接
          connectSocket(connectTimeout, readTimeout);
        // 建立协议,构造读写桥梁,很重要的方法
      } catch (IOException e) {
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();


     * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
     * proxy server can issue an auth challenge and then close the connection.
     * 是否通过代理隧道建立HTTPS连接的所有工作。 这里的问题是代理服务器可以发出一个验证质询,然后关闭连接。
    private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
                               EventListener eventListener) throws IOException {
        //1-构造一个 建立隧道连接 请求。
        Request tunnelRequest = createTunnelRequest();
        HttpUrl url = tunnelRequest.url();
        for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
            //2 与HTTP代理服务器建立TCP连接。
            connectSocket(connectTimeout, readTimeout, call, eventListener);
            //3 创建隧道。这主要是将 建立隧道连接 请求发送给HTTP代理服务器,并处理它的响应
            tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

            if (tunnelRequest == null) break; // Tunnel successfully created.

            // The proxy decided to close the connection after an auth challenge. We need to create a new
            // connection, but this time with the auth credentials.
            rawSocket = null;
            sink = null;
            source = null;
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);


     * Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
     * 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作。
    private void connectSocket(int connectTimeout, int readTimeout, Call call,
                               EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
       //是普通的创建new Socket(host, port, clientAddress, clientPort);否则用代理
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
                ? address.socketFactory().createSocket()
                : new Socket(proxy);
        eventListener.connectStart(call, route.socketAddress(), proxy);
        try {
            //建立Socket连接,实际调用的就是socket.connect(address, connectTimeout);
            Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
            ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
            throw ce;

        // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
        // More details:
        // https://github.com/square/okhttp/issues/3245
        // https://android-review.googlesource.com/#/c/271775/
        try {
            //okio 拿到输入流,最终的目的就是建立了管道流
            source = Okio.buffer(Okio.source(rawSocket));
           // 输出流
            sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
            if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
                throw new IOException(npe);

public class Platform {
  public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);


private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
  //   一些协议的设置,主要方法我们看http2.0的startHttp2()方法
 if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;

    eventListener.secureConnectEnd(call, handshake);
    if (protocol == Protocol.HTTP_2) {
  //开启了http2Connection 线程,这是个流读写的线程,就是服务器客户端直接的通道交互
  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)


   * Sends any initial frames and starts reading frames from the remote peer. This should be called
   * after {@link Builder#build} for all new connections.
  public void start() throws IOException {

   * @param sendConnectionPreface true to send connection preface frames. This should always be true
   *     except for in tests that don't check for a connection preface.
  void start(boolean sendConnectionPreface) throws IOException {
    if (sendConnectionPreface) {
      int windowSize = okHttpSettings.getInitialWindowSize();
      if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
        writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
    new Thread(readerRunnable).start(); // Not a daemon thread.


class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler {
    final Http2Reader reader;

    ReaderRunnable(Http2Reader reader) {
      super("OkHttp %s", hostname);
      this.reader = reader;

   protected void execute() {
      ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
      ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
      try {
        while (reader.nextFrame(false, this)) {
        connectionErrorCode = ErrorCode.NO_ERROR;
        streamErrorCode = ErrorCode.CANCEL;
      } catch (IOException e) {
        connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
        streamErrorCode = ErrorCode.PROTOCOL_ERROR;
      } finally {
        try {
          close(connectionErrorCode, streamErrorCode);
        } catch (IOException ignored) {

从Realconnection调用connect()创建了socket连接之后(这里讨论走http2.0协议分支),创建了一个http2Connection 对象,启用了一个readerRunnable的线程,run()方法的主要工作是循环地执行reader.nextFrame()方法。

 public boolean nextFrame(boolean requireSettings, Handler handler) throws IOException {
    //不停的在读数据帧,直到流关闭(发生IOException )返回false
    try {
      source.require(9); // Frame header size
    } catch (IOException e) {
      return false; // This might be a normal socket close.

    //  0                   1                   2                   3
    //  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
    // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    // |                 Length (24)                   |
    // +---------------+---------------+---------------+
    // |   Type (8)    |   Flags (8)   |
    // +-+-+-----------+---------------+-------------------------------+
    // |R|                 Stream Identifier (31)                      |
    // +=+=============================================================+
    // |                   Frame Payload (0...)                      ...
    // +---------------------------------------------------------------+
    int length = readMedium(source);
    if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) {
      throw ioException("FRAME_SIZE_ERROR: %s", length);
    byte type = (byte) (source.readByte() & 0xff);
    if (requireSettings && type != TYPE_SETTINGS) {
      throw ioException("Expected a SETTINGS frame but was %s", type);
    byte flags = (byte) (source.readByte() & 0xff);
   //streamId 很重要,用来追踪是哪次请求流的
   //Map<Integer, Http2Stream> streams ,Http2Connection里维护的一个map,用来保存各个请求流
    int streamId = (source.readInt() & 0x7fffffff); // Ignore reserved bit.
    if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags));
    //醍醐灌顶,特别是那个readHeaders(handler, length, flags, streamId)方法
    switch (type) {
      case TYPE_DATA:
        readData(handler, length, flags, streamId);

      case TYPE_HEADERS:
        readHeaders(handler, length, flags, streamId);

      case TYPE_PRIORITY:
        readPriority(handler, length, flags, streamId);

      case TYPE_RST_STREAM:
        readRstStream(handler, length, flags, streamId);

      case TYPE_SETTINGS:
        readSettings(handler, length, flags, streamId);

        readPushPromise(handler, length, flags, streamId);

      case TYPE_PING:
        readPing(handler, length, flags, streamId);

      case TYPE_GOAWAY:
        readGoAway(handler, length, flags, streamId);

        readWindowUpdate(handler, length, flags, streamId);

        // Implementations MUST discard frames that have unknown or unsupported types.
    return true;

再来重点看看 readHeaders(handler, length, flags, streamId)方法,因为在后面的CallServerInterceptor拦截器会追踪到,提前了解一下,是怎么读取response的headers的:

  private void readHeaders(Handler handler, int length, byte flags, int streamId)
      throws IOException {
    //streamId 很重要,用来追踪是哪次请求的流
    if (streamId == 0) throw ioException("PROTOCOL_ERROR: TYPE_HEADERS streamId == 0");

    boolean endStream = (flags & FLAG_END_STREAM) != 0;

    short padding = (flags & FLAG_PADDED) != 0 ? (short) (source.readByte() & 0xff) : 0;

    if ((flags & FLAG_PRIORITY) != 0) {
      readPriority(handler, streamId);
      length -= 5; // account for above read.

    length = lengthWithoutPadding(length, flags, padding);

    List<Header> headerBlock = readHeaderBlock(length, padding, flags, streamId);
    handler.headers(endStream, streamId, -1, headerBlock);

 public void headers(boolean inFinished, int streamId, int associatedStreamId,
        List<Header> headerBlock) {
      if (pushedStream(streamId)) {
        pushHeadersLater(streamId, headerBlock, inFinished);
      Http2Stream stream;
      synchronized (Http2Connection.this) {
        stream = getStream(streamId);

        if (stream == null) {
          // If we're shutdown, don't bother with this stream.
          if (shutdown) return;

          // If the stream ID is less than the last created ID, assume it's already closed.
          if (streamId <= lastGoodStreamId) return;

          // If the stream ID is in the client's namespace, assume it's already closed.
          if (streamId % 2 == nextStreamId % 2) return;

          // Create a stream.
          Headers headers = Util.toHeaders(headerBlock);
          final Http2Stream newStream = new Http2Stream(streamId, Http2Connection.this,
              false, inFinished, headers);
          lastGoodStreamId = streamId;
          streams.put(streamId, newStream);
          listenerExecutor.execute(new NamedRunnable("OkHttp %s stream %d", hostname, streamId) {
            @Override public void execute() {
              try {
              } catch (IOException e) {
                Platform.get().log(INFO, "Http2Connection.Listener failure for " + hostname, e);
                try {
                } catch (IOException ignored) {

      // Update an existing stream.
      if (inFinished) stream.receiveFin();

   * Accept headers from the network and store them until the client calls {@link #takeHeaders}, or
   * {@link FramingSource#read} them.
  void receiveHeaders(List<Header> headers) {
    assert (!Thread.holdsLock(Http2Stream.this));
    boolean open;
    synchronized (this) {
      hasResponseHeaders = true;
      open = isOpen();
    if (!open) {
  //取headers 这个也是stream的方法,stream是什么时候创建的呢,下篇讲CallServerInterceptor的时候会讲到
  public synchronized Headers takeHeaders() throws IOException {
    try {
      while (headersQueue.isEmpty() && errorCode == null) {
    } finally {
    if (!headersQueue.isEmpty()) {
      return headersQueue.removeFirst();
    throw new StreamResetException(errorCode);


public final class ConnectionPool {
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              //里的notifyAll方法被唤醒来使用,详见 https://www.jianshu.com/p/c518f9c07a80
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);

先搞明白那个清除cleanup(long now)方法:

    //返回纳秒级的睡眠持续时间,直到下次预定调用此方法为止。 如果不需要进一步清理,则返回-1。
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;

        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                RealConnection connection = i.next();

                // If the connection is in use, keep searching.
                //1. 查询此连接内部的StreanAllocation的引用数量,大于0则跳过这个连接
                if (pruneAndGetAllocationCount(connection, now) > 0) {


                // If the connection is ready to be evicted, we're done.
                //2. 标记闲置最久的空闲连接
                long idleDurationNs = now - connection.idleAtNanos;
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;

           //3. 如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理掉。
            if (longestIdleDurationNs >= this.keepAliveDurationNs
                    || idleConnectionCount > this.maxIdleConnections) {
                //只有这个分支才会清理连接,清理后需要关闭链接,最终return 0
            } else if (idleConnectionCount > 0) {
                // A connection will be ready to evict soon.
                //4. 返回此连接的到期时间,供下次进行清理。
                return keepAliveDurationNs - longestIdleDurationNs;
            } else if (inUseConnectionCount > 0) {
                // All connections are in use. It'll be at least the keep alive duration 'til we run again.
                //5. 全部都是活跃连接,5分钟时候再进行清理。
                return keepAliveDurationNs;
            } else {
                // No connections, idle or in use.
                //6. 没有任何连接,跳出循环。
                cleanupRunning = false;
                return -1;
        //7. 关闭连接,返回时间0,立即再次进行清理。

        // Cleanup again immediately.
        return 0;


   * Prunes any leaked allocations and then returns the number of remaining live allocations on
   * {@code connection}. Allocations are leaked if the connection is tracking them but the
   * application code has abandoned them. Leak detection is imprecise and relies on garbage
   * collection.
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);
      if (reference.get() != null) {
      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
    return references.size();


  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //cleanupRunning 这个变量是在上面的cleanup()方法里控制的,这个变量保证了同时间只会有一个清理任务在跑
    if (!cleanupRunning) {
      cleanupRunning = true;
  1. 利用一个线程池在不断监控当前的闲置链接数量和链接闲置的时长,当数量和时长出现超载的情况的时候就会执行清除动作。
  2. 每当往连接池加入一个链接的时候,会根据当前是否有清理线程来决定是否开启一个新的清理线程,保证始终只有一个清理线程任务在跑。
 Internal.instance = new Internal() {
      @Override public boolean connectionBecameIdle(
          ConnectionPool pool, RealConnection connection) {
        return pool.connectionBecameIdle(connection);

      @Override public RealConnection get(ConnectionPool pool, Address address,
          StreamAllocation streamAllocation, Route route) {
        return pool.get(address, streamAllocation, route);

      @Override public Socket deduplicate(
          ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
        return pool.deduplicate(address, streamAllocation);

      @Override public void put(ConnectionPool pool, RealConnection connection) {

      @Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
        return connectionPool.routeDatabase;


   * Returns a recycled connection to {@code address}, or null if no such connection exists. The route is null if the address has not yet been routed.
   * 返回一个可复用链接,如果还没有被创建则为null
  RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    // 遍历已有连接集合
    for (RealConnection connection : connections) { 
      if (connection.isEligible(address, route)) {
        return connection;
    return null;


   * Notify this pool that {@code connection} has become idle. Returns true if the connection has
   * been removed from the pool and should be closed.
  boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (connection.noNewStreams || maxIdleConnections == 0) {
      return true;
    } else {
      //唤醒cleanupRunnable 线程来清理他
      return false;


   * Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
   * This recovers when multiple multiplexed connections are created concurrently.
 //如果可能,用共享连接替换 {@code streamAllocation} 持有的连接。
   //  当同时创建多个多路复用连接时会恢复。
  Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      //connection 能让address复用&&connection支持多路复用(http2.0就能支持)&&
      //connection 不等于streamAllocation持有的connection 
      if (connection.isEligible(address, null)
          && connection.isMultiplexed()
          && connection != streamAllocation.connection()) {
        return streamAllocation.releaseAndAcquire(connection);
    return null;


  /** Close and remove all idle connections in the pool. */
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          connection.noNewStreams = true;
    for (RealConnection connection : evictedConnections) {




public final class StreamAllocation {
 public final Address address;//地址
  private Route route; //路由
  private final ConnectionPool connectionPool;  //连接池
  private final Object callStackTrace; //日志

  // State guarded by connectionPool.
  private final RouteSelector routeSelector; //路由选择器
  private int refusedStreamCount;  //拒绝的次数
  private RealConnection connection;  //连接
  private boolean released;  //是否已经被释放
  private boolean canceled  //是否被取消了

  public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;


  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
    } catch (IOException e) {
      throw new RouteException(e);


   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      return candidate;


   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");
      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
          // 如果已经存在的连接满足要求,则使用已存在的连接
        return allocatedConnection;
      //2. 尝试从链接池中去取,这个get()方法我们在前面讲到过,实际调用的是connectionPool的get()方法
      // 最终会调用到StreamAllocation里的acquire()方法,这个方法会给connection变量赋值
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;

      selectedRoute = route;
       // 线路的选择,多ip的支持
    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
     // 以上都不符合,创建一个连接
      result = new RealConnection(connectionPool, selectedRoute);
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      Internal.instance.put(connectionPool, result);
      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        //socket 不为空则代表有确实有重复的socket,下面会把他关掉达到复用不浪费资源的目的
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
    return result;

   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
  public void acquire(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    //此时connection 必须是空的,才能被赋值,如果不为空会报非法异常
    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));

  public Socket releaseAndAcquire(RealConnection newConnection) {
    assert (Thread.holdsLock(connectionPool));
    if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException();

    // Release the old connection.
    Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0);
    Socket socket = deallocate(true, false, false);

    // Acquire the new connection.
    this.connection = newConnection;

    return socket;

   * Releases resources held by this allocation. If sufficient resources are allocated, the
   * connection will be detached or closed. Callers must be synchronized on the connection pool.
   * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
   * of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
  private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
    assert (Thread.holdsLock(connectionPool));

    if (streamFinished) {
      this.codec = null;
    if (released) {
      this.released = true;
    Socket socket = null;
    if (connection != null) {
      if (noNewStreams) {
        connection.noNewStreams = true;
      if (this.codec == null && (this.released || connection.noNewStreams)) {
        if (connection.allocations.isEmpty()) {
          connection.idleAtNanos = System.nanoTime();
          if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
            socket = connection.socket();
        connection = null;
    return socket;



最终得到了一个可用的链接connection,并且在newStream()方法后还用这个链接新建了一个Http2Codec(http2.0) ,Http2Codec 的作用主要是对请求进行编码和对response进行解码,可以理解成对流的一些操作封装。


