一次HttpClient的BUG实践

业务场景

业务需要从微博API拉取每一条微博的评论,拉取评论之前需要保证微博授权未失效,失效则用httpClient发送请求来模拟登陆微博拿到授权。该业务一个调写成度服务,半个小时一次,每次拉取前三天到今天发送的微博的评论,目前每次需要拉取评论的微博数目大概最低在3000个最高是达到6000,每次拉取开30个线程来处理微博拉取评论,写一个调度器负责分配这30个线程拉去微博数据。

问题发生

image.png

某天突然报错,大量连接超时,信息如上图。且同时间另外一个微博相关服务也发生了超时报错,错误开始蔓延,解决问题刻不容缓,紧急先将该代码回滚。

问题分析

先从报错日志信息入手,大量的请求发送失败,发现CloseableHttpClient这个类中报了错,再继续看,是PoolingHttpClientConnectionManager报错。从日志初步来看,应该是HttpClient的连接池相关的问题,在获取连接时超时。
既然是连接池相关的问题,先使用jstack命令查看jvm记录的线程状态信息。果然发现大量线程处于阻塞状态,查到这里基本可以确定问题就出在HttpClient的连接池上了,具体问题如何解决,还需要在看一看HttpClient相关的配置。

HttpClient解析

CloseableHttpClient

直接先看CloseableHttpClient类。

@ThreadSafe
public abstract class CloseableHttpClient implements HttpClient, Closeable {
    private final Log log = LogFactory.getLog(this.getClass());

    public CloseableHttpClient() {
    }

    protected abstract CloseableHttpResponse doExecute(HttpHost var1, HttpRequest var2, HttpContext var3) throws IOException, ClientProtocolException;

    public CloseableHttpResponse execute(HttpHost target, HttpRequest request, HttpContext context) throws IOException, ClientProtocolException {
        return this.doExecute(target, request, context);
    }

    public CloseableHttpResponse execute(HttpUriRequest request, HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        return this.doExecute(determineTarget(request), request, context);
    }

    private static HttpHost determineTarget(HttpUriRequest request) throws ClientProtocolException {
        HttpHost target = null;
        URI requestURI = request.getURI();
        if (requestURI.isAbsolute()) {
            target = URIUtils.extractHost(requestURI);
            if (target == null) {
                throw new ClientProtocolException("URI does not specify a valid host name: " + requestURI);
            }
        }

        return target;
    }

    public CloseableHttpResponse execute(HttpUriRequest request) throws IOException, ClientProtocolException {
        return this.execute(request, (HttpContext)null);
    }

    public CloseableHttpResponse execute(HttpHost target, HttpRequest request) throws IOException, ClientProtocolException {
        return this.doExecute(target, request, (HttpContext)null);
    }

    public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, ClientProtocolException {
        return this.execute((HttpUriRequest)request, (ResponseHandler)responseHandler, (HttpContext)null);
    }

    public <T> T execute(HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) throws IOException, ClientProtocolException {
        HttpHost target = determineTarget(request);
        return this.execute(target, request, responseHandler, context);
    }

    public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler) throws IOException, ClientProtocolException {
        return this.execute(target, request, responseHandler, (HttpContext)null);
    }

    public <T> T execute(HttpHost target, HttpRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(responseHandler, "Response handler");
        CloseableHttpResponse response = this.execute(target, request, context);

        Object var8;
        try {
            HttpEntity entity;
            try {
                T result = responseHandler.handleResponse(response);
                entity = response.getEntity();
                EntityUtils.consume(entity);
                var8 = result;
            } catch (ClientProtocolException var14) {
                entity = response.getEntity();

                try {
                    EntityUtils.consume(entity);
                } catch (Exception var13) {
                    this.log.warn("Error consuming content after an exception.", var13);
                }

                throw var14;
            }
        } finally {
            response.close();
        }

        return var8;
    }
}

抽象类CloseableHttpClient实现了HttpClient接口,在具体到项目中使用的是InternalHttpClient,InternalHttpClient继承了CloseableHttpClient。

@ThreadSafe
class InternalHttpClient extends CloseableHttpClient implements Configurable {
    private final Log log = LogFactory.getLog(this.getClass());
    private final ClientExecChain execChain;
    private final HttpClientConnectionManager connManager;
    private final HttpRoutePlanner routePlanner;
    private final Lookup<CookieSpecProvider> cookieSpecRegistry;
    private final Lookup<AuthSchemeProvider> authSchemeRegistry;
    private final CookieStore cookieStore;
    private final CredentialsProvider credentialsProvider;
    private final RequestConfig defaultConfig;
    private final List<Closeable> closeables;
.......
}

可以看到其中的几个关键的配置属性HttpClientConnectionManager、HttpRoutePlanner和RequestConfig。

HttpClientConnectionManager

HttpClientConnectionManager是一个HTTP连接管理器,负责了新的HTTP连接的创建、管理连接的生命周期,保证一个HTTP连接某一时刻只被一个线程使用。

public interface HttpClientConnectionManager {
    ConnectionRequest requestConnection(HttpRoute var1, Object var2);

    void releaseConnection(HttpClientConnection var1, Object var2, long var3, TimeUnit var5);

    void connect(HttpClientConnection var1, HttpRoute var2, int var3, HttpContext var4) throws IOException;

    void upgrade(HttpClientConnection var1, HttpRoute var2, HttpContext var3) throws IOException;

    void routeComplete(HttpClientConnection var1, HttpRoute var2, HttpContext var3) throws IOException;

    void closeIdleConnections(long var1, TimeUnit var3);

    void closeExpiredConnections();

    void shutdown();
}

可以看到其定义了connection的获取、释放、关闭以及使用等功能。
主要有两个实现类
PoolingHttpClientConnectionManager和BasicHttpClientConnectionManager,针对遇到的问题,主要看看PoolingHttpClientConnectionManager。

PoolingHttpClientConnectionManager

BasicHttpClientConnectionManager只管理一个连接,所以也只能被一个线程所使用,而PoolingHttpClientConnectionManager管理着一个连接池。它可以同时为多个线程服务。每次新来一个请求,如果在连接池中已经存在route相同并且可用的connection,连接池就会直接复用这个connection;当不存在route相同的connection,就新建一个connection为之服务;如果连接池已满,则请求会等待直到被服务或者超时。HttpClientBuilder默认不进行配置的话构建的实例也是PoolingHttpClientConnectionManager,它的生命周期是在应用容器启动时被实例化,整个应用结束的时候会调用HttpClient.close()方法结束掉。
在PoolingHttpClientConnectionManager的配置中有两个最大连接数量,分别控制着总的最大连接数量和每个route的最大连接数量。如果没有显式设置,默认每个route只允许最多2个connection,总的connection数量不超过20。这个值对于很多并发度高的应用来说是不够的,必须根据实际的情况设置合适的值,思路和线程池的大小设置方式是类似的,如果所有的连接请求都是到同一个url,那可以把MaxPerRoute的值设置成和MaxTotal一致,这样就能更高效地复用连接。

public PoolingHttpClientConnectionManager(HttpClientConnectionOperator httpClientConnectionOperator, HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory, long timeToLive, TimeUnit tunit) {
        this.log = LogFactory.getLog(this.getClass());
        this.configData = new ConfigData();
        this.pool = new CPool(new InternalConnectionFactory(this.configData, connFactory), 2, 20, timeToLive, tunit);
        this.pool.setValidateAfterInactivity(2000);
        this.connectionOperator = (HttpClientConnectionOperator)Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
        this.isShutDown = new AtomicBoolean(false);
    }

HttpRoutePlanner

HttpRoutePlanner是基于http上下文情况下,客户端到服务器的路由计算策略,一般没有代理的话,就不用设置这个东西。这里有一个很关键的概念—Route:在HttpClient中,一个Route指 运行环境机器->目标机器host的一条线路,也就是如果目标url的host是同一个,那么它们的route也是一样的。

RequestConfig

public class RequestConfig implements Cloneable {

    public static final RequestConfig DEFAULT = new Builder().build();

    private final boolean expectContinueEnabled;
    private final HttpHost proxy;
    private final InetAddress localAddress;
    private final boolean staleConnectionCheckEnabled;
    private final String cookieSpec;
    private final boolean redirectsEnabled;
    private final boolean relativeRedirectsAllowed;
    private final boolean circularRedirectsAllowed;
    private final int maxRedirects;
    private final boolean authenticationEnabled;
    private final Collection<String> targetPreferredAuthSchemes;
    private final Collection<String> proxyPreferredAuthSchemes;
    private final int connectionRequestTimeout;
    private final int connectTimeout;
    private final int socketTimeout;
    private final boolean contentCompressionEnabled;

    protected RequestConfig() {
        this(false, null, null, false, null, false, false, false, 0, false, null, null, 0, 0, 0, true);
    }

    RequestConfig(
            final boolean expectContinueEnabled,
            final HttpHost proxy,
            final InetAddress localAddress,
            final boolean staleConnectionCheckEnabled,
            final String cookieSpec,
            final boolean redirectsEnabled,
            final boolean relativeRedirectsAllowed,
            final boolean circularRedirectsAllowed,
            final int maxRedirects,
            final boolean authenticationEnabled,
            final Collection<String> targetPreferredAuthSchemes,
            final Collection<String> proxyPreferredAuthSchemes,
            final int connectionRequestTimeout,
            final int connectTimeout,
            final int socketTimeout,
            final boolean contentCompressionEnabled) {
        super();
        this.expectContinueEnabled = expectContinueEnabled;
        this.proxy = proxy;
        this.localAddress = localAddress;
        this.staleConnectionCheckEnabled = staleConnectionCheckEnabled;
        this.cookieSpec = cookieSpec;
        this.redirectsEnabled = redirectsEnabled;
        this.relativeRedirectsAllowed = relativeRedirectsAllowed;
        this.circularRedirectsAllowed = circularRedirectsAllowed;
        this.maxRedirects = maxRedirects;
        this.authenticationEnabled = authenticationEnabled;
        this.targetPreferredAuthSchemes = targetPreferredAuthSchemes;
        this.proxyPreferredAuthSchemes = proxyPreferredAuthSchemes;
        this.connectionRequestTimeout = connectionRequestTimeout;
        this.connectTimeout = connectTimeout;
        this.socketTimeout = socketTimeout;
        this.contentCompressionEnabled = contentCompressionEnabled;
    }
.......
}

RequestConfig定义了针对Request的很多的参数,可以看到在默认情况下,几个超时时间的配置默认为0,代表着无限等待,分别是:

  • connectionRequestTimeout—从连接池中取连接的超时时间
    从连接池取出连接的超时时间。在连接池没有可用连接时,请求会阻塞等待,若超过connectionRequestTimeout设置的时间则抛出异常ConnectionPoolTimeoutException。
  • connectTimeout—连接超时时间
    定义了服务器连接到网络指定位置时的超时时间,若超过connectTimeout,则抛出ConnectionTimeoutException。
  • socketTimeout—请求超时时间
    连接到服务器之后到从服务器获取响应数据需要等待的时间,发生超时,会抛出SocketTimeoutException异常。

问题解决

看到这里,问题已经很清晰了,去查看项目中的配置:

private static final String HTTP = "http";
private static final String HTTPS = "https";
private static SSLConnectionSocketFactory sslsf = null;
private static PoolingHttpClientConnectionManager cm = null;
private static SSLContextBuilder builder = null;
public static CookieStore cookieStore = null;

private static final String URL = "https://api.weibo.com/oauth2/authorize";

private static final String CLIENT_ID = "********";
private static final String REDIRECT_URI = "http%3a%2f%2femarketing.wenwo.com%2fcms%2fweibo%2fafterLogin";
private static final String REFERER =
        "https://api.weibo.com/oauth2/authorize?client_id=3174813925&response_type=code&display=pc&forcelogin=true&redirect_uri="
                + "XXXXXXXXXX%2fcms%2fweibo%2fafterLogin";

static {
  try {
    cookieStore = new BasicCookieStore();
    builder = new SSLContextBuilder();
    builder.loadTrustMaterial(null, new TrustStrategy() {
      @Override
      public boolean isTrusted(X509Certificate[] x509Certificates, String s)
              throws CertificateException {
        return true;
      }
    });
    sslsf =
            new SSLConnectionSocketFactory(builder.build(), new String[] {"SSLv2Hello", "SSLv3",
                    "TLSv1", "TLSv1.2"}, null, NoopHostnameVerifier.INSTANCE);
    Registry<ConnectionSocketFactory> registry =
            RegistryBuilder.<ConnectionSocketFactory>create()
                    .register(HTTP, new PlainConnectionSocketFactory()).register(HTTPS, sslsf)
                    .build();
    cm = new PoolingHttpClientConnectionManager(registry);
    cm.setMaxTotal(200);// max connection
  } catch (Exception e) {
    e.printStackTrace();
  }
}

/**
 * 获取httpcliet
 * 
 * @return
 * @throws Exception
 */
public static CloseableHttpClient getHttpClient() throws Exception {
  CloseableHttpClient httpClient =
          HttpClients.custom().setSSLSocketFactory(sslsf).setConnectionManager(cm)
                  .setConnectionManagerShared(true).setDefaultCookieStore(cookieStore).build();
  return httpClient;
}

PoolingHttpClientConnectionManager(registry);
cm.setMaxTotal(200);// max connection
仅针对最大连接数设置了20,每个route的最大连接数量并没有配置,如果没有显式设置,默认每个route只允许最多2个connection,而在该业务场景下,30个线程中,登录的route肯定不止2个connection,因此剩下的请求都会等待,同时在获取CloseableHttpClient的方法getHttpClient()中,并没有对RequestConfig进行配置。
所以默认的情况下上面写到的三个超时时间都为0(如果不设置request的Config,会在execute的过程中使用HttpClientParamConfig的getRequestConfig中用默认参数进行设置),这也就意味着无限等待,很容易导致所有的请求阻塞在这个地方无限期等待。
而另外一个业务中,发送微博也需要模拟登录微博,也就是仍然会请求同一个route,如果此时,拉取评论业务的http还没有执行完,自然发送微博业务的请求就无法从连接池拿到链接,于是发送失败了。

所以我们需要合理的设置这几个参数,在这个场景中都是针对同一个route的请求,所以特殊情况需要特殊参数。
另外综合考虑业务场景需求,对执行效率和时效性是否有需求,如果没有需求,那就完全可以用少数线程执行,在夜间闲时运行程序,毕竟服务器资源还是比较珍贵的,此类操作处理不好的话结果是灾难性的。

后续优化

解决这个请求超时的问题后,其实还有一个数据库入库的问题,请求回来的数据可以多线程的取取,入库的话也要注意对数据库的资源占用。因此为了降低数据库压力,也采取了一个生产消费者模式来进行优化,维护一个数据队列,拉数据的线程拉回的数据放入到队列,入库的线程可以按照实际情况按照合理的线程数进行入库操作。
同时还考虑到队列的容量问题,无限大的队列显然是不合适的,于是采取了有限的队列,队列若已满则暂时阻塞读取的线程,有空闲的则插入。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容