HttpClient源码分析

  • 本文概述

    透过源码分析HttpClient的原理。

  • 使用回顾

    public static void sendRequestWithHttpClient() {
            List<NameValuePair> formparams = new ArrayList<NameValuePair>();
            formparams.add(new BasicNameValuePair("account", ""));
            formparams.add(new BasicNameValuePair("password", ""));
            HttpEntity reqEntity = null;
            try {
                reqEntity = new UrlEncodedFormEntity(formparams, "utf-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
    
            RequestConfig requestConfig = RequestConfig.custom()
                    .setConnectTimeout(5000)//一、连接超时:connectionTimeout-->指的是连接一个url的连接等待时间
                    .setSocketTimeout(5000)// 二、读取数据超时:SocketTimeout-->指的是连接上一个url,获取response的返回等待时间
                    .setConnectionRequestTimeout(5000)
                    .build();
    
            HttpClient client = HttpClientBuilder.create().build();
            HttpPost post = new HttpPost("http://xxx.xx.xx/login");
            post.setEntity(reqEntity);
            post.setConfig(requestConfig);
            HttpResponse response = null;
            try {
                response = client.execute(post);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            if (response.getStatusLine().getStatusCode() == 200) {
                HttpEntity resEntity = response.getEntity();
                String message = null;
                try {
                    message = EntityUtils.toString(resEntity, "utf-8");
                } catch (IOException e) {
                    e.printStackTrace();
                }
                System.out.println(message);
            } else {
                System.out.println("请求失败");
            }
        }
    
  • 源码分析

    NameValuePair是一个存储键/值的接口:

    public interface NameValuePair {
    
        /**
         * Gets the name of this pair.
         *
         * @return the name of this pair, never {@code null}.
         */
        String getName();
    
        /**
         * Gets the value of this pair.
         *
         * @return the value of this pair, may be {@code null}.
         */
        String getValue();
    

    然后构造一个UrlEncodedFormEntity对象,传入键值信息和编码:

    public UrlEncodedFormEntity (
        final List <? extends NameValuePair> parameters,
        final String charset) throws UnsupportedEncodingException {
        super(URLEncodedUtils.format(parameters,
                charset != null ? charset : HTTP.DEF_CONTENT_CHARSET.name()),
                ContentType.create(URLEncodedUtils.CONTENT_TYPE, charset));
    }
    

    super构造方法里,URLEncodedUtils.format将键值信息通过charset转换成编码后的信息字符串:

    public static String format(
            final List <? extends NameValuePair> parameters,
            final String charset) {
        return format(parameters, QP_SEP_A, charset);
    }
    

    QP_SEP_A是“&”

    public static String format(
            final List <? extends NameValuePair> parameters,
            final char parameterSeparator,
            final String charset) {
        final StringBuilder result = new StringBuilder();
        for (final NameValuePair parameter : parameters) {
            final String encodedName = encodeFormFields(parameter.getName(), charset);
            final String encodedValue = encodeFormFields(parameter.getValue(), charset);
            if (result.length() > 0) {
                result.append(parameterSeparator);
            }
            result.append(encodedName);
            if (encodedValue != null) {
                result.append(NAME_VALUE_SEPARATOR);
                result.append(encodedValue);
            }
        }
        return result.toString();
    }
    

    format里把每一对键值都按照传入的charset进行编码后存入result,每一对键值之间用传入的“&”分割,键和值之间用NAME_VALUE_SEPARATOE(“=”)连接。

    再来看super的第二个参数ContentType.create(URLEncodedUtils.CONTENT_TYPE, charset),URLEncodedUtils.CONTENT_TYPE的值是application/x-www-form-urlencoded:

    public static ContentType create(
            final String mimeType, final String charset) throws UnsupportedCharsetException {
        return create(mimeType, !TextUtils.isBlank(charset) ? Charset.forName(charset) : null);
    }
    

    Charset.forName会把编码格式的字符串名字转换成Charset对象:

    public static Charset forName(String charsetName) {
        Charset cs = lookup(charsetName);
        if (cs != null)
            return cs;
        throw new UnsupportedCharsetException(charsetName);
    }
    
    private static Charset lookup(String charsetName) {
        if (charsetName == null)
            throw new IllegalArgumentException("Null charset name");
    
    
          //首先判断上一次使用的Charset对象是否还在缓存中
        final Map.Entry<String, Charset> cached = cache1;
        if (cached != null && charsetName.equals(cached.getKey()))
            return cached.getValue();
          //如果没有再从最近使用的一些Charset里面去找
        return lookup2(charsetName);
    }
    
    private static Charset lookup2(String charsetName) {
        Charset cs;
        synchronized (cache2) {
            if ((cs = cache2.get(charsetName)) != null) {
                  //如果cache2中有了则直接返回这个对象,并把cache1更新为最新使用的这个Charset对象
                cache1 = new AbstractMap.SimpleImmutableEntry<>(charsetName, cs);
                return cs;
            }
        }
    
        // Android-changed: Drop support for "standard" and "extended"
        // providers.
          //这里通过ServiceLoader加载CharsetProvider,再通过CharsetProvider去加载Jvm中的属性获取Charset对象
        if ((cs = NativeConverter.charsetForName(charsetName))  != null ||
            (cs = lookupViaProviders(charsetName))              != null)
        {
              //保存到cache1和cache2中
            cache(charsetName, cs);
            return cs;
        }
    
        /* Only need to check the name if we didn't find a charset for it */
        checkName(charsetName);
        return null;
    }
    

    关于cache1和cache2:

    /* The standard set of charsets */
    // Android-removed: We use ICU's list of standard charsets.
    // private static CharsetProvider standardProvider = new StandardCharsets();
    
    // Cache of the most-recently-returned charsets,
    // along with the names that were used to find them
    //
    // cache1/2 usage is explained in the lookup method
    //
    private static volatile Map.Entry<String, Charset> cache1 = null; // "Level 1" cache
    private static final HashMap<String, Charset> cache2 = new HashMap<>(); // "Level 2" cache
    
    private static void cache(String charsetName, Charset cs) {
        synchronized(cache2) {
            String canonicalName = cs.name();
            Charset canonicalCharset = cache2.get(canonicalName);
    
            if (canonicalCharset != null) {
                cs = canonicalCharset;
            } else {
                cache2.put(canonicalName, cs);
    
                for (String alias : cs.aliases()) {
                    cache2.put(alias, cs);
                }
            }
    
            cache2.put(charsetName, cs);
        }
    
        cache1 = new AbstractMap.SimpleImmutableEntry<>(charsetName, cs);
    }
    

    回到create方法,最终返回一个ContentType对象。

    再进到super里面,是StringEntity的构造方法:

    public StringEntity(final String string, final ContentType contentType) throws UnsupportedCharsetException {
        super();
        Args.notNull(string, "Source string");
        Charset charset = contentType != null ? contentType.getCharset() : null;
        if (charset == null) {
            charset = HTTP.DEF_CONTENT_CHARSET;
        }
        this.content = string.getBytes(charset);
        if (contentType != null) {
            setContentType(contentType.toString());
        }
    }
    

    如果之前设置的编码不存在则统一按照HTTP.DEF_CONTENT_CHARSET(“ISO-8859-1”)来设置,string就是之前的“key1=value1&key2=value2...”,把它进行编码后存到content中,然后设置ContentType,contentType.toString:

    @Override
    public String toString() {
        final CharArrayBuffer buf = new CharArrayBuffer(64);
        buf.append(this.mimeType);
          //这里的params是null,如果设置这个,可以把charset设置在里面,因为它和charset设置是if-else关系
        if (this.params != null) {
            buf.append("; ");
            BasicHeaderValueFormatter.INSTANCE.formatParameters(buf, this.params, false);
        } else if (this.charset != null) {
            buf.append("; charset=");
            buf.append(this.charset.name());
        }
        return buf.toString();
    }
    

    最终contentType.toString方法会把这些信息设置成Header存进contentType(诸如“application/x-www-form-urlencoded;charset=utf-8”的内容)。

    回到HttpClient使用,接下来是构造HttpClient客户端,然后构建请求(这里是POST请求(HttpPost)),给请求设置RequestConfig,参数不深究,直接看client.execute,这里的client是build返回的InternalHttpClient,发现execute是其直接父类CloseableHttpClient的方法,并通过调用重载方法执行doExecute:

    @Override
    public CloseableHttpResponse execute(
            final HttpUriRequest request,
            final HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        return doExecute(determineTarget(request), request, context);
    }
    

    doExecute由InternalHttpClient覆写:

    @Override
    protected CloseableHttpResponse doExecute(
            final HttpHost target,
            final HttpRequest request,
            final HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        HttpExecutionAware execAware = null;
        if (request instanceof HttpExecutionAware) {
            execAware = (HttpExecutionAware) request;
        }
        try {
            final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
            final HttpClientContext localcontext = HttpClientContext.adapt(
                    context != null ? context : new BasicHttpContext());
            RequestConfig config = null;
            if (request instanceof Configurable) {
                config = ((Configurable) request).getConfig();
            }
            if (config == null) {
                final HttpParams params = request.getParams();
                if (params instanceof HttpParamsNames) {
                    if (!((HttpParamsNames) params).getNames().isEmpty()) {
                        config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);
                    }
                } else {
                    config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);
                }
            }
            if (config != null) {
                localcontext.setRequestConfig(config);
            }
            setupContext(localcontext);
            final HttpRoute route = determineRoute(target, wrapper, localcontext);
            return this.execChain.execute(route, wrapper, localcontext, execAware);
        } catch (final HttpException httpException) {
            throw new ClientProtocolException(httpException);
        }
    }
    

    可以看到最后执行了execChain.execute方法,execChain是构造的时候传入的,所以最终顺着找到了MainClientExec:

    ClientExecChain execChain = createMainExec(
            requestExecCopy,
            connManagerCopy,
            reuseStrategyCopy,
            keepAliveStrategyCopy,
            new ImmutableHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
            targetAuthStrategyCopy,
            proxyAuthStrategyCopy,
            userTokenHandlerCopy);
    #############
       protected ClientExecChain createMainExec(
                final HttpRequestExecutor requestExec,
                final HttpClientConnectionManager connManager,
                final ConnectionReuseStrategy reuseStrategy,
                final ConnectionKeepAliveStrategy keepAliveStrategy,
                final HttpProcessor proxyHttpProcessor,
                final AuthenticationStrategy targetAuthStrategy,
                final AuthenticationStrategy proxyAuthStrategy,
                final UserTokenHandler userTokenHandler)
        {
            return new MainClientExec(
                    requestExec,
                    connManager,
                    reuseStrategy,
                    keepAliveStrategy,
                    proxyHttpProcessor,
                    targetAuthStrategy,
                    proxyAuthStrategy,
                    userTokenHandler);
        }
    

    它的execute方法如下:

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        Args.notNull(route, "HTTP route");
        Args.notNull(request, "HTTP request");
        Args.notNull(context, "HTTP context");
    
        AuthState targetAuthState = context.getTargetAuthState();
        if (targetAuthState == null) {
            targetAuthState = new AuthState();
            context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
        }
        AuthState proxyAuthState = context.getProxyAuthState();
        if (proxyAuthState == null) {
            proxyAuthState = new AuthState();
            context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
        }
    
        if (request instanceof HttpEntityEnclosingRequest) {
            RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
        }
    
        Object userToken = context.getUserToken();
    
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            }
            execAware.setCancellable(connRequest);
        }
    
        final RequestConfig config = context.getRequestConfig();
    
        final HttpClientConnection managedConn;
        try {
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new RequestAbortedException("Request aborted", interrupted);
        } catch(final ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            throw new RequestAbortedException("Request execution failed", cause);
        }
    
        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
    
        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    managedConn.close();
                }
            }
        }
    
        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                execAware.setCancellable(connHolder);
            }
    
            HttpResponse response;
            for (int execCount = 1;; execCount++) {
    
                if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                    throw new NonRepeatableRequestException("Cannot retry request " +
                            "with a non-repeatable request entity.");
                }
    
                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }
    
                if (!managedConn.isOpen()) {
                    this.log.debug("Opening connection " + route);
                    try {
                        establishRoute(proxyAuthState, managedConn, route, request, context);
                    } catch (final TunnelRefusedException ex) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug(ex.getMessage());
                        }
                        response = ex.getResponse();
                        break;
                    }
                }
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    managedConn.setSocketTimeout(timeout);
                }
    
                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }
    
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Executing request " + request.getRequestLine());
                }
    
                if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Target auth state: " + targetAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, targetAuthState, context);
                }
                if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Proxy auth state: " + proxyAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, proxyAuthState, context);
                }
    
                context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
                response = requestExecutor.execute(request, managedConn, context);
    
                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    // Set the idle duration of this connection
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }
    
                if (needAuthentication(
                        targetAuthState, proxyAuthState, route, response, context)) {
                    // Make sure the response body is fully consumed, if present
                    final HttpEntity entity = response.getEntity();
                    if (connHolder.isReusable()) {
                        EntityUtils.consume(entity);
                    } else {
                        managedConn.close();
                        if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
                                && proxyAuthState.isConnectionBased()) {
                            this.log.debug("Resetting proxy auth state");
                            proxyAuthState.reset();
                        }
                        if (targetAuthState.getState() == AuthProtocolState.SUCCESS
                                && targetAuthState.isConnectionBased()) {
                            this.log.debug("Resetting target auth state");
                            targetAuthState.reset();
                        }
                    }
                    // discard previous auth headers
                    final HttpRequest original = request.getOriginal();
                    if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
                        request.removeHeaders(AUTH.WWW_AUTH_RESP);
                    }
                    if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
                        request.removeHeaders(AUTH.PROXY_AUTH_RESP);
                    }
                } else {
                    break;
                }
            }
    
            if (userToken == null) {
                userToken = userTokenHandler.getUserToken(context);
                context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
            }
            if (userToken != null) {
                connHolder.setState(userToken);
            }
    
            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            }
            return new HttpResponseProxy(response, connHolder);
        } catch (final ConnectionShutdownException ex) {
            final InterruptedIOException ioex = new InterruptedIOException(
                    "Connection has been shut down");
            ioex.initCause(ex);
            throw ioex;
        } catch (final HttpException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final IOException ex) {
            connHolder.abortConnection();
            if (proxyAuthState.isConnectionBased()) {
                proxyAuthState.reset();
            }
            if (targetAuthState.isConnectionBased()) {
                targetAuthState.reset();
            }
            throw ex;
        } catch (final RuntimeException ex) {
            connHolder.abortConnection();
            if (proxyAuthState.isConnectionBased()) {
                proxyAuthState.reset();
            }
            if (targetAuthState.isConnectionBased()) {
                targetAuthState.reset();
            }
            throw ex;
        } catch (final Error error) {
            connManager.shutdown();
            throw error;
        }
    }
    

    requestExecutor.execute(request, managedConn, context)得到了一个response,所以这句代码就是请求数据的地方,request是前面的HttpPost,managedConn是connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS)得到的,connRequest是connManager.requestConnection(route, userToken)得到的,connManager是前面构造MainClientExec时传入的PoolingHttpClientConnectionManager,它的requestConnection方法是:

    @Override
    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        Args.notNull(route, "HTTP route");
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state) + formatStats(route));
        }
        Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
    
            @Override
            public boolean cancel() {
                return future.cancel(true);
            }
    
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
                if (conn.isOpen()) {
                    final HttpHost host;
                    if (route.getProxyHost() != null) {
                        host = route.getProxyHost();
                    } else {
                        host = route.getTargetHost();
                    }
                    final SocketConfig socketConfig = resolveSocketConfig(host);
                    conn.setSocketTimeout(socketConfig.getSoTimeout());
                }
                return conn;
            }
    
        };
    
    }
    

    返回的匿名对象ConnectionRequest调用get方法就得到了managedConn,可以看到conn是通过leaseConnection方法得到的:

    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {
            entry = future.get(timeout, timeUnit);
            if (entry == null || future.isCancelled()) {
                throw new ExecutionException(new CancellationException("Operation cancelled"));
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
            if (this.log.isDebugEnabled()) {
                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
            }
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }
    

    所以conn是一个CPoolProxy对象,也就是managedConn。requestExecutor也是前面构造时传入的HttpRequestExecutor,它的execute方法如下:

    public HttpResponse execute(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws IOException, HttpException {
        Args.notNull(request, "HTTP request");
        Args.notNull(conn, "Client connection");
        Args.notNull(context, "HTTP context");
        try {
            HttpResponse response = doSendRequest(request, conn, context);
            if (response == null) {
                response = doReceiveResponse(request, conn, context);
            }
            return response;
        } catch (final IOException ex) {
            closeConnection(conn);
            throw ex;
        } catch (final HttpException ex) {
            closeConnection(conn);
            throw ex;
        } catch (final RuntimeException ex) {
            closeConnection(conn);
            throw ex;
        }
    }
    

    看到了熟悉的doSendRequest方法:

    protected HttpResponse doSendRequest(
            final HttpRequest request,
            final HttpClientConnection conn,
            final HttpContext context) throws IOException, HttpException {
        Args.notNull(request, "HTTP request");
        Args.notNull(conn, "Client connection");
        Args.notNull(context, "HTTP context");
    
        HttpResponse response = null;
    
        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
        context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);
    
        conn.sendRequestHeader(request);
        if (request instanceof HttpEntityEnclosingRequest) {
            // Check for expect-continue handshake. We have to flush the
            // headers and wait for an 100-continue response to handle it.
            // If we get a different response, we must not send the entity.
            boolean sendentity = true;
            final ProtocolVersion ver =
                request.getRequestLine().getProtocolVersion();
            if (((HttpEntityEnclosingRequest) request).expectContinue() &&
                !ver.lessEquals(HttpVersion.HTTP_1_0)) {
    
                conn.flush();
                // As suggested by RFC 2616 section 8.2.3, we don't wait for a
                // 100-continue response forever. On timeout, send the entity.
                if (conn.isResponseAvailable(this.waitForContinue)) {
                    response = conn.receiveResponseHeader();
                    if (canResponseHaveBody(request, response)) {
                        conn.receiveResponseEntity(response);
                    }
                    final int status = response.getStatusLine().getStatusCode();
                    if (status < 200) {
                        if (status != HttpStatus.SC_CONTINUE) {
                            throw new ProtocolException(
                                    "Unexpected response: " + response.getStatusLine());
                        }
                        // discard 100-continue
                        response = null;
                    } else {
                        sendentity = false;
                    }
                }
            }
            if (sendentity) {
                conn.sendRequestEntity((HttpEntityEnclosingRequest) request);
            }
        }
        conn.flush();
        context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);
        return response;
    }
    

    发现CPoolProxy里面的方法都是通过他持有的对象poolEntry去实现的,所以它叫做Proxy,那肯定是有一个CPool的对象,继续往下找。poolEntry是leaseConnection里的CPoolProxy.newProxy(entry)传入的,entry通过future.get(timeout, timeUnit)创建,future又通过requestConnection里的this.pool.lease(route, state, null)创建,this.pool是构造时创建的:

    this.pool = new CPool(new InternalConnectionFactory(this.configData, connFactory), 2, 20, timeToLive, timeUnit);
    

    它里面的ConnFactory是InternalConnectionFactory(this.configData, connFactory),CPool调用lease就得到了future:

    @Override
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        Args.notNull(route, "Route");
        Asserts.check(!this.isShutDown, "Connection pool shut down");
    
        return new Future<E>() {
    
            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
    
            @Override
            public boolean cancel(final boolean mayInterruptIfRunning) {
                if (done.compareAndSet(false, true)) {
                    cancelled.set(true);
                    lock.lock();
                    try {
                        condition.signalAll();
                    } finally {
                        lock.unlock();
                    }
                    if (callback != null) {
                        callback.cancelled();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public boolean isCancelled() {
                return cancelled.get();
            }
    
            @Override
            public boolean isDone() {
                return done.get();
            }
    
            @Override
            public E get() throws InterruptedException, ExecutionException {
                try {
                    return get(0L, TimeUnit.MILLISECONDS);
                } catch (final TimeoutException ex) {
                    throw new ExecutionException(ex);
                }
            }
    
            @Override
            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                for (;;) {
                    synchronized (this) {
                        try {
                            final E entry = entryRef.get();
                            if (entry != null) {
                                return entry;
                            }
                            if (done.get()) {
                                throw new ExecutionException(operationAborted());
                            }
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
                            if (validateAfterInactivity > 0)  {
                                if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
                                    if (!validate(leasedEntry)) {
                                        leasedEntry.close();
                                        release(leasedEntry, false);
                                        continue;
                                    }
                                }
                            }
                            if (done.compareAndSet(false, true)) {
                                entryRef.set(leasedEntry);
                                done.set(true);
                                onLease(leasedEntry);
                                if (callback != null) {
                                    callback.completed(leasedEntry);
                                }
                                return leasedEntry;
                            } else {
                                release(leasedEntry, true);
                                throw new ExecutionException(operationAborted());
                            }
                        } catch (final IOException ex) {
                            if (done.compareAndSet(false, true)) {
                                if (callback != null) {
                                    callback.failed(ex);
                                }
                            }
                            throw new ExecutionException(ex);
                        }
                    }
                }
            }
    
        };
    }
    

    get方法里通过getPoolEntryBlocking()来得到CPoolEntry(Future<CPoolEntry>泛型决定是CPoolEntry):

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit timeUnit,
            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
    
        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                if (future.isCancelled()) {
                    throw new ExecutionException(operationAborted());
                }
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }
    
                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }
    
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }
    
                boolean success = false;
                try {
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new ExecutionException(operationAborted());
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }
    

    首先要通过getPool拿到pool:

    private RouteSpecificPool<T, C, E> getPool(final T route) {
        RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
        if (pool == null) {
            pool = new RouteSpecificPool<T, C, E>(route) {
    
                @Override
                protected E createEntry(final C conn) {
                    return AbstractConnPool.this.createEntry(route, conn);
                }
    
            };
            this.routeToPool.put(route, pool);
        }
        return pool;
    }
    

    然后通过pool.getFree(state)得到entry,RouteSpecificPool中:

    public E getFree(final Object state) {
        if (!this.available.isEmpty()) {
            if (state != null) {
                final Iterator<E> it = this.available.iterator();
                while (it.hasNext()) {
                    final E entry = it.next();
                    if (state.equals(entry.getState())) {
                        it.remove();
                        this.leased.add(entry);
                        return entry;
                    }
                }
            }
            final Iterator<E> it = this.available.iterator();
            while (it.hasNext()) {
                final E entry = it.next();
                if (entry.getState() == null) {
                    it.remove();
                    this.leased.add(entry);
                    return entry;
                }
            }
        }
        return null;
    }
    

    这里的available暂时是空的,所以getPoolEntryBlocking会走到;

    if (pool.getAllocatedCount() < maxPerRoute) {                
                          final int totalUsed = this.leased.size();
                final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                if (freeCapacity > 0) {
                    final int totalAvailable = this.available.size();
                    if (totalAvailable > freeCapacity - 1) {
                        if (!this.available.isEmpty()) {
                            final E lastUsed = this.available.removeLast();
                            lastUsed.close();
                            final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                            otherpool.remove(lastUsed);
                        }
                    }
                    final C conn = this.connFactory.create(route);
                    entry = pool.add(conn);
                    this.leased.add(entry);
                    return entry;
                }
    }
    

    entry通过pool.add方法创建了:

    //RouteSpecificPool里的add方法
    public E add(final C conn) {
        final E entry = createEntry(conn);
        this.leased.add(entry);
        return entry;
    }
    
    ......
      
    //add方法调用的createEntry就是匿名对象(也就是entry对象)的createEntry
    pool = new RouteSpecificPool<T, C, E>(route) {
            @Override
            protected E createEntry(final C conn) {
                return AbstractConnPool.this.createEntry(route, conn);
            }
    };
    
    //AbstractConnPool.this也就是当前上下文对象CPool,所以这里的AbstractConnPool.this.createEntry就是CPool里面覆写的
    @Override
    protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {
        final String id = Long.toString(COUNTER.getAndIncrement());
        return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);
    }
    

    终于找到了CPoolEntry对象。

    再回到doSendRequest,conn.flush方法调用过程如下:

    @Override
    public void flush() throws IOException {
        getValidConnection().flush();
    }
    
    .....
      
    ManagedHttpClientConnection getValidConnection() {
        final ManagedHttpClientConnection conn = getConnection();
        if (conn == null) {
            throw new ConnectionShutdownException();
        }
        return conn;
    }
    
    ......
      
    ManagedHttpClientConnection getConnection() {
        final CPoolEntry local = this.poolEntry;
        if (local == null) {
            return null;
        }
        return local.getConnection();
    }
    

    那这里获取到的local就是上面的new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit),最后是通过它的conn对象来执行flush的,一步步往上找,最终找到conn是通过this.connFactory.create(route)创建的,通过查找,connFactory就是前面构造CPool的时候传入的InternalConnectionFactory,看一下它的create方法:

    @Override
    public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
        ConnectionConfig config = null;
        if (route.getProxyHost() != null) {
            config = this.configData.getConnectionConfig(route.getProxyHost());
        }
        if (config == null) {
            config = this.configData.getConnectionConfig(route.getTargetHost());
        }
        if (config == null) {
            config = this.configData.getDefaultConnectionConfig();
        }
        if (config == null) {
            config = ConnectionConfig.DEFAULT;
        }
        return this.connFactory.create(route, config);
    }
    

    里面还有个connFactory,发现是构造的时候设置的,构造的时候传入的是null,所以根据:

    InternalConnectionFactory(
            final ConfigData configData,
            final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
        super();
        this.configData = configData != null ? configData : new ConfigData();
        this.connFactory = connFactory != null ? connFactory :
            ManagedHttpClientConnectionFactory.INSTANCE;
    }
    

    可知这里的connFactory就是ManagedHttpClientConnectionFactory.INSTANCE,它的create方法如下:

    @Override
    public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
        final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
        CharsetDecoder charDecoder = null;
        CharsetEncoder charEncoder = null;
        final Charset charset = cconfig.getCharset();
        final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
                cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
        final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
                cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
        if (charset != null) {
            charDecoder = charset.newDecoder();
            charDecoder.onMalformedInput(malformedInputAction);
            charDecoder.onUnmappableCharacter(unmappableInputAction);
            charEncoder = charset.newEncoder();
            charEncoder.onMalformedInput(malformedInputAction);
            charEncoder.onUnmappableCharacter(unmappableInputAction);
        }
        final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
        return new LoggingManagedHttpClientConnection(
                id,
                log,
                headerLog,
                wireLog,
                cconfig.getBufferSize(),
                cconfig.getFragmentSizeHint(),
                charDecoder,
                charEncoder,
                cconfig.getMessageConstraints(),
                incomingContentStrategy,
                outgoingContentStrategy,
                requestWriterFactory,
                responseParserFactory);
    }
    

    所以我们找一下LoggingManagedHttpClientConnection的flush方法,没找到,但在它的父类的父类DefaultBHttpClientConnection中找到了:

    @Override
    public void flush() throws IOException {
        ensureOpen();
        doFlush();
    }
    

    ensureOpen在其父类中:

    protected void ensureOpen() throws IOException {
        final Socket socket = this.socketHolder.get();
        if (socket == null) {
            throw new ConnectionClosedException();
        }
        if (!this.inBuffer.isBound()) {
            this.inBuffer.bind(getSocketInputStream(socket));
        }
        if (!this.outbuffer.isBound()) {
            this.outbuffer.bind(getSocketOutputStream(socket));
        }
    }
    
    ......
      
    protected InputStream getSocketInputStream(final Socket socket) throws IOException {
        return socket.getInputStream();
    }
    
    protected OutputStream getSocketOutputStream(final Socket socket) throws IOException {
        return socket.getOutputStream();
    }
    

    到这里就调用到了socket层的API了,ensureOpen就相当于前面文章说的HttpURLConnection里面的connect。

    还有flush:

    protected void doFlush() throws IOException {
        this.outbuffer.flush();
    }
    
    ......
      
    //SessionOutputBufferImpl中
    @Override
    public void flush() throws IOException {
          //flushBuffer是把缓冲区的内容添加到stream
        flushBuffer();
          //flushStream是把stream中的数据通过网路发送出去
        flushStream();
    }
    
    ......
      
    private void flushBuffer() throws IOException {
        final int len = this.buffer.length();
        if (len > 0) {
            streamWrite(this.buffer.buffer(), 0, len);
            this.buffer.clear();
            this.metrics.incrementBytesTransferred(len);
        }
    }
    
    private void streamWrite(final byte[] b, final int off, final int len) throws IOException {
        Asserts.notNull(outStream, "Output stream");
          //outStream是前面ensureOpen里面通过bind方法赋值的
        this.outStream.write(b, off, len);
    }
    
    private void flushStream() throws IOException {
        if (this.outStream != null) {
            this.outStream.flush();
        }
    }
    

    至此,HttpClient的请求结束了,那response是如何拿到的呢?

    因为doSendRequest中response是通过receiveResponseHeader去获取的,所以以同样的方式找到了DefaultBHttpClientConnection中的receiveResponseHeader:

    @Override
    public HttpResponse receiveResponseHeader() throws HttpException, IOException {
        ensureOpen();
        final HttpResponse response = this.responseParser.parse();
        onResponseReceived(response);
        if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {
            incrementResponseCount();
        }
        return response;
    }
    

    this.responseParser.parse()最终在AbstractMessageParser中找到:

    @Override
    public T parse() throws IOException, HttpException {
        final int st = this.state;
        switch (st) {
        case HEAD_LINE:
            try {
                this.message = parseHead(this.sessionBuffer);
            } catch (final ParseException px) {
                throw new ProtocolException(px.getMessage(), px);
            }
            this.state = HEADERS;
            //$FALL-THROUGH$
        case HEADERS:
            final Header[] headers = AbstractMessageParser.parseHeaders(
                    this.sessionBuffer,
                    this.messageConstraints.getMaxHeaderCount(),
                    this.messageConstraints.getMaxLineLength(),
                    this.lineParser,
                    this.headerLines);
            this.message.setHeaders(headers);
            final T result = this.message;
            this.message = null;
            this.headerLines.clear();
            this.state = HEAD_LINE;
            return result;
        default:
            throw new IllegalStateException("Inconsistent parser state");
        }
    }
    

    这里只会解析header部分的数据,因为doSendRequest中还有一个receiveResponseHeader之后还会执行一个receiveResponseEntity方法(canResponseHaveBody(request, response)判断如果有body的话)用来解析body:

    @Override
    public void receiveResponseEntity(
            final HttpResponse response) throws HttpException, IOException {
        Args.notNull(response, "HTTP response");
        ensureOpen();
        final HttpEntity entity = prepareInput(response);
        response.setEntity(entity);
    }
    
    ......
     //BHttpConnectionBase中
     protected HttpEntity prepareInput(final HttpMessage message) throws HttpException {
            final BasicHttpEntity entity = new BasicHttpEntity();
    
            final long len = this.incomingContentStrategy.determineLength(message);
            final InputStream inStream = createInputStream(len, this.inBuffer);
            if (len == ContentLengthStrategy.CHUNKED) {
                entity.setChunked(true);
                entity.setContentLength(-1);
                entity.setContent(inStream);
            } else if (len == ContentLengthStrategy.IDENTITY) {
                entity.setChunked(false);
                entity.setContentLength(-1);
                entity.setContent(inStream);
            } else {
                entity.setChunked(false);
                entity.setContentLength(len);
                entity.setContent(inStream);
            }
    
            final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
            if (contentTypeHeader != null) {
                entity.setContentType(contentTypeHeader);
            }
            final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
            if (contentEncodingHeader != null) {
                entity.setContentEncoding(contentEncodingHeader);
            }
            return entity;
        }
    

    最后返回到doSendRequest,再返回到调用client.execute的地方。至此,HttpClient的调用过程就结束了。

  • 总结

    HttpClient的调用过程相较于HttpURLConnection又做了一层封装,这使得使用起来更加简洁,但是分析的过程中也能感觉到HttpClient框架对此做了很多工作,通过源码分析知道了底层也是调用socket进行网络通信。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容

  • 先看官网两张图【引用来自官网】:image.png 官网说明: 1.首先 ReferenceConfig 类的 i...
    致虑阅读 1,024评论 0 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 本文概述结合使用从源码层面分析OkHttp的原理。 使用回顾public static String getByO...
    Horps阅读 426评论 0 1
  • 第二章 连接管理 HttpClient有一个对连接初始化和终止,还有在活动连接上I/O操作的完整控制。而连接操作的...
    狂奔的蜗牛_wxc阅读 1,148评论 0 0
  • 在日常开发中网络请求是很常见的功能。OkHttp作为Android开发中最常用的网络请求框架,在Android开发...
    maoqitian阅读 1,171评论 1 11