基于OkHttp的Http指标监控

Http请求过程

image

指标数据

1.入队到请求结束耗时
2.dns查询耗时
3.socket connect耗时
4.tls连接的耗时
5.请求发送耗时
6.响应传输耗时
7.首包耗时
8.响应解析耗时
9.Http错误,区分业务错误和请求错误

采集到以上指标,结合数据可视化的工具,可以对Http个阶段的耗时和错误分布有直观的感受,同时对优化业务Http请求提供数据支持。

如何获取指标

获取指标数据首先需要找到产生指标数据的关键代码,然后插入收集代码即可。
如果业务中使用的框架没有源码或者不重新打包源码的情况下,如何插入代码?
这个就需要使用到能够实现AOP的工具,在前面分享的Monitor中的提供注解和配置文件的方式,在指定函数中插入相关代码的功能。这样的实现方式也可以使监控代码和业务代码分离。

OkHttp框架

OkHttp是Android上最常用的Http请求框架,OkHttp的最新版本已经升级到4.0.x,实现也全部由java替换到了Kotlin,API的一些使用也会有些不同。由于4.x的设备不是默认支持TLSV1.2版本,OkHttp 3.13.x以上的版本需要在Android 5.0+(API level 21+)和Java 1.8的环境开发,不过OkHttp也为了支持4.x设备单独创立了3.12.x分支,本文中使用的OkHttp版本为3.12.3版本。

OkHttp整体流程

先引用个别人画流程图(原图来自

image

请求过程分析

1.创建 OkHttpClient

new OkHttpClient()中会调用new OkHttpClient.Builder()方法,Builder()会设置一些的默认值。OkHttpClient()会把OkHttpClient.Builder()产生的对象中字段复制到对应OkHttpClient对象的字段上,其中sslSocketFactory如果没有在Builder中设置,OkHttp会获取系统默认的sslSocketFactory。

public OkHttpClient.Builder(){
  /**
   *异步请求的分发器,其中使用 不限制线程数,存活时间为60s的线程池来执行异步请求
  * 默认限制同时执行的异步请求不操过64个,每个host的同时执行的异步请求不操过5个
    *超过限制新的请求需要等待。
    * */
   dispatcher = new Dispatcher();
  //支持的协议类型 Http1.0/1.1/2,QUIC 
   protocols = DEFAULT_PROTOCOLS;
   //支持TLS和ClearText
   connectionSpecs = DEFAULT_CONNECTION_SPECS;
   //请求事件通知器,大部分指标数据都可以度过EventListener来获取
   eventListenerFactory = EventListener.factory(EventListener.NONE);
  //系统级代理服务器
   proxySelector = ProxySelector.getDefault();
   if(proxySelector == null){
     proxySelector = new NullProxySelector();
   }
   //默认不使用Cookie
   cookieJar = CookieJar.NO_COOKIES;
  //socket工厂
   socketFactory = SocketFactory.getDefault();
  //用于https主机名验证
   hostnameVerifier = OkHostnameVerifier.INSTANCE;
  //用于约束哪些证书是可信的,可以用来证书固定
   certificatePinner = CertificatePinner.DEFAULT;
  //实现HTTP协议的信息认证
   proxyAuthenticator = Authenticator.NONE;
   //实现HTTP协议的信息认证
   authenticator = Authenticator.NONE;
   /**
    *实现多路复用机制连接池,最多保持5个空闲连接
   *每个空闲连接最多保持五分钟
    * */
   connectionPool = new ConnectionPool();
  //dns解析,默认使用系统InetAddress.getAllByName(hostname)
   dns = Dns.SYSTEM;
  //支持ssl重定向
   followSslRedirects = true;
  //支持重定向
   followRedirects = true;
  //连接失败是否重试
   retryOnConnectionFailure = true;
  //请求超时时间,0为不超时
   callTimeout = 0;
  //连接超时时间
   connectTimeout = 10_000;
  //socket读超时时间
   readTimeout = 10_000;
  //socket写超时时间
   writeTimeout = 10_000;
  //websocket 心跳间隔
   pingInterval = 0;
}

//获取默认的sslSocketFactory
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);

2.Request执行过程

从上面的流程图可以看出,不管同步请求还是异步请求,最终都会调用到 RealCall.getResponseWithInterceptorChain(),getResponseWithInterceptorChain() 再调用RealInterceptorChain.proceed(Request request)方法发起最终请求,下面我们来分析一下这两个方法的具体代码。

 Response RealCall.getResponseWithInterceptorChain() throws IOException {
 //组装所有的Interceptors
  List<Interceptor> interceptors = new ArrayList<>();
  //业务自定的Interceptors,通过OkHttpClient.Builid.addInterceptor添加
  interceptors.addAll(client.interceptors());
 //其他功能interceptors
  interceptors.add(retryAndFollowUpInterceptor);
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
 //如果不是forWebSocket请求,添加通过OkHttpClient.Builid.addNetworkInterceptor添加的Interceptor
  if (!forWebSocket) {
    interceptors.addAll([client.networkInterceptors](http://client.networkinterceptors/)());
  }
 //真正发起网络请求的Interceptor
  interceptors.add(new CallServerInterceptor(forWebSocket));
 //创建RealInterceptorChain
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
      originalRequest, this, eventListener, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());
  //开始执行请求
  Response response = chain.proceed(originalRequest);
 //如果请求被取消
  if (retryAndFollowUpInterceptor.isCanceled()) {
    closeQuietly(response);
    throw new IOException("Canceled");
  }
  return response;
}

接下来再看RealInterceptorChain.proceed(Request request)代码

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, 
    RealConnection connection) throws IOException { 
  /*
    *index代表当前应该执行的Interceptor在Interceptor列表中的位置,如果超过 
    *Interceptor列表size,报错
   *在RealCall.getResponseWithInterceptorChain()第一次调用proceed方法时传递的index值为0
  */
  if (index >= interceptors.size()) throw new AssertionError(); 
 //执行次数+1
  calls++; 
 //httpCodec时在ConnectInterceptor创建的,会对应一个socket连接
  if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { 
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) 
        + " must retain the same host and port"); 
  } 
  // If we already have a stream, confirm that this is the only call to chain.proceed(). 
  if (this.httpCodec != null && calls > 1) { 
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) 
        + " must call proceed() exactly once"); 
  } 
  //创建新的RealInterceptorChain,通过改变index的值来实现调用Interceptor列表下一个位置的Interceptor
  RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, 
      connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, 
      writeTimeout); 
  //获取当前index位置的Interceptor
  Interceptor interceptor = interceptors.get(index); 
 //执行当前位置的interceptor,同时传递新创建的RealInterceptorChain,新的RealInterceptorChain中的index值是当前Interceptor列表中下一个位置
  //interceptor.intercept中会调用新的RealInterceptorChain的proceed方法,实现向后调用
  Response response = interceptor.intercept(next); 
  // 如果当前RealInterceptorChain的httpCodec不为空,确保下一个位置的Interceptor只被调用一次,httpCodec是在ConnectInterceptor中被赋值
  if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { 
    throw new IllegalStateException("network interceptor " + interceptor 
        + " must call proceed() exactly once"); 
  } 
  // response为空报错
  if (response == null) { 
    throw new NullPointerException("interceptor " + interceptor + " returned null"); 
  } 
 // response.body为空报错
  if (response.body() == null) { 
    throw new IllegalStateException( 
        "interceptor " + interceptor + " returned a response with no body"); 
  } 
  //返回response
  return response; 
}

从代码可以看出 Interceptor 是 OkHttp 最核心的功能类,Interceptor 把实际的网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都实现为一个Interceptor,它们最终组成一个了Interceptor.Chain的责任链,其中每个 Interceptor 都可能完成 Request到 Response 转变的任务,循着责任链让每个 Interceptor 自行决定能否完成任务以及怎么完成任务,完成网络请求这件事也从 RealCall 类中剥离了出来,简化了各自的责任和逻辑,代码变得优雅。
这些Interceptor最为关键的两个Interceptor是ConnectInterceptor和CallServerInterceptor,ConnectInterceptor的主要功能是在连接池里找到可复用连接,如果没有,就创建新的socket,进行tls握手,将socket用Okio进行包裹,创建HttpCodec。CallServerInterceptor使用HttpCodec进行相关协议的传输和解析。下面对ConnectInterceptor中findConnect过程和CallServerInterceptor请求过程做一个分析。

3.ConnectInterceptor findConnection过程分析

在ConnectInterceptor中,会为本次请求创建可用的RealConnection,首先会从连接池中找到能够复用的连接,如果没有就创建新的socket,然后使用RealConnection创建HttpCodec。创建RealConnection的方法调用链路为StreamAllocation.newStream()-> StreamAllocation.findHealthyConnection()->StreamAllocation.findConnection(),findConnection()是创建连接的主要代码。

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    //最终找到的connection
    RealConnection result = null;
    Route selectedRoute = null;
    //需要释放的connection
    Connection releasedConnection;
    //需要关闭的socket
    Socket toClose;
    synchronized (connectionPool) {
        if (released) throw new IllegalStateException("released");
        if (codec != null) throw new IllegalStateException("codec != null");
        if (canceled) throw new IOException("Canceled");
        /**
         *如果遇到重定向到同一个地址的情况下,在RetryAndFollowUpInterceptor中会使用已经分配的StreamAllocation
         *进行重定向请求,这个时候的connection不为空,但是这个connection不一定时有效的连接。
         * **/
        releasedConnection = this.connection;
        /**
         *如果已经存在RealConnection,但是不能用来创建新的Stream
         *就设置this.connection=null,同时返回要关闭的socket
         *当前请求第一次执行时releaseIfNoNewStreams()不进行任何操作
         * */
        toClose = releaseIfNoNewStreams();
        /**
         *
         * 这时候this.connection不为空
         * 表示原来的connection可以用来创建新的Stream
         * 当前请求第一次执行时this.connection=null
         *
         * */
        if (this.connection != null) {
            // We had an already-allocated connection and it's good.
            result = this.connection;
            releasedConnection = null;
        }
        /**
         * reportedAcquired会在新建socket或者从连接池
         * 获取到有效RealConnection时赋值为true
         * 当前请求第一次执行时reportedAcquired=fasle
         * */
        if (!reportedAcquired) {
            // If the connection was never reported acquired, don't report it as released!
            releasedConnection = null;
        }
        /**
         * 如果还没找到目标RealConnection
         * 尝试从连接池中获取
         * */
        if (result == null) {
            /**
             * 对于非http/2协议,如果已经存在不超过过RealConnection复用的最大值且协议,证书都一致
             * 这个RealConnection可以用来复用
             * 如果从连接池中获取RealConnection,会调用
             * streamAllocation.acquire()设置connection为新值
             * */
            Internal.instance.get(connectionPool, address, this, null);
            /**
             * connection != null表示从连接池获取到合适的
             * RealConnection,设置foundPooledConnection = true;
             * */
            if (connection != null) {
                foundPooledConnection = true;
                result = connection;
            } else {
                selectedRoute = route;
            }
        }
    }
    /**
     * close当前的需要关闭的socket
     * */
    closeQuietly(toClose);
    /**
     * 如果当前的RealConnection需要释放,调用eventListener
     * */
    if (releasedConnection != null) {
        eventListener.connectionReleased(call, releasedConnection);
    }
    /**
     * 如果从连接池获取到RealConnection,调用eventListener
     * */
    if (foundPooledConnection) {
        eventListener.connectionAcquired(call, result);
    }
    /**
     * 当前RealConnection可以继续使用或者从连接池中找到合适的RealConnection
     * 返回这个RealConnection
     * */
    if (result != null) {
        // If we found an already-allocated or pooled connection, we're done.
        return result;
    }
    // If we need a route selection, make one. This is a blocking operation.
    /**
     * routeSelector在StreamAllocation构造方法中被创建
     * 连接池重新寻找
     * 请求第一次执行时routeSelector.next()会进行域名解析工作
     * */
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
        newRouteSelection = true;
        routeSelection = routeSelector.next();
    }
    synchronized (connectionPool) {
        if (canceled) throw new IOException("Canceled");
        if (newRouteSelection) {
            // Now that we have a set of IP addresses, make another attempt at getting a connection from
            // the pool. This could match due to connection coalescing.
            List<Route> routes = routeSelection.getAll();
            for (int i = 0, size = routes.size(); i < size; i++) {
                Route route = routes.get(i);
                Internal.instance.get(connectionPool, address, this, route);
                if (connection != null) {
                    foundPooledConnection = true;
                    result = connection;
                    this.route = route;
                    break;
                }
            }
        }
        if (!foundPooledConnection) {
            if (selectedRoute == null) {
                selectedRoute = routeSelection.next();
            }
            // Create a connection and assign it to this allocation immediately. This makes it possible
            // for an asynchronous cancel() to interrupt the handshake we're about to do.
            route = selectedRoute;
            refusedStreamCount = 0;
            result = new RealConnection(connectionPool, selectedRoute);
            acquire(result, false);
        }
    }
    // If we found a pooled connection on the 2nd time around, we're done.
    /**
     * 如果在连接池重新找到合适的RealConnection,返回
     * */
    if (foundPooledConnection) {
        eventListener.connectionAcquired(call, result);
        return result;
    }
    /**
     * 如果还没有找到,就需要创建新的RealConnect
     * 生成新的socket,建立Tls,并加入ConnectPool
     * */
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());
    Socket socket = null;
    synchronized (connectionPool) {
        reportedAcquired = true;
        // Pool the connection.
        Internal.instance.put(connectionPool, result);
        // If another multiplexed connection to the same address was created concurrently, then
        // release this connection and acquire that one.
        if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
        }
    }
    closeQuietly(socket);
    eventListener.connectionAcquired(call, result);
    return result;
}

4.CallServerInterceptor程分析

public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
 //Http解析器,在ConncetInterceptor中创建
  HttpCodec httpCodec = realChain.httpStream();
  StreamAllocation streamAllocation = realChain.streamAllocation();
  /**
   *请求的使用的连接,在ConncetInterceptor中产生
   *连接可能是从ConnectPool中选择或者重新创建出来
  **/
  RealConnection connection = (RealConnection) realChain.connection();
  Request request = realChain.request();
  long sentRequestMillis = System.currentTimeMillis();
  realChain.eventListener().requestHeadersStart(realChain.call());
  /**
   * 通过httpCodec中用Okio包裹的socket写请求头
  **/
  httpCodec.writeRequestHeaders(request);
  realChain.eventListener().requestHeadersEnd(realChain.call(), request);
  Response.Builder responseBuilder = null;
  /**
   * 如果请求有请求body,发送body
   **/
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
 request body.
 /**
   * 请求头是Expect: 100-continue,先读响应头
 *    httpCodec.readResponseHeaders方法读取到状态码100时, 会返回null
**/
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
      httpCodec.flushRequest();
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(true);
    }
 /**
   * 如果正常发送请求body部分
   *  请求头有Expect: 100-continue,但是服务器没有返回状态码100,且不是Http/2协议,关闭当前连接
**/
    if (responseBuilder == null) {
      realChain.eventListener().requestBodyStart(realChain.call());
      long contentLength = request.body().contentLength();
      CountingSink requestBodyOut =
          new CountingSink(httpCodec.createRequestBody(request, contentLength));
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
      realChain.eventListener()
          .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
    } else if (!connection.isMultiplexed()) {
      streamAllocation.noNewStreams();
    }
  }
  httpCodec.finishRequest();
 /**
   * 正常情况下,读响应headers
**/
  if (responseBuilder == null) {
    realChain.eventListener().responseHeadersStart(realChain.call());
    responseBuilder = httpCodec.readResponseHeaders(false);
  }
  Response response = responseBuilder
      .request(request)
      .handshake(streamAllocation.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();
  int code = response.code();
  if (code == 100) {
    responseBuilder = httpCodec.readResponseHeaders(false);
    response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    code = response.code();
  }
 /**
   * 读响应headers结束
**/
  realChain.eventListener()
          .responseHeadersEnd(realChain.call(), response);
/**
   * 如果是forWebSocket,且 code == 101返回空响应
*   其他返回 RealResponseBody对象
**/
  if (forWebSocket && code == 101) {
    // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
    response = response.newBuilder()
        .body(Util.EMPTY_RESPONSE)
        .build();
  } else {
   /**
  *将
  *无响应内容
  * chunk内容,
  * 存在contenlength内容
  * 不存在contenlength内容
  *  的响应body包裹成 RealResponseBody对象
 *
**/
    response = response.newBuilder()
        .body(httpCodec.openResponseBody(response))
        .build();
  }
  /**
  * 服务端关闭要求连接
**/
  if ("close".equalsIgnoreCase(response.request().header("Connection"))
      || "close".equalsIgnoreCase(response.header("Connection"))) {
    streamAllocation.noNewStreams();
  }
   /**
    * code是204/205,contentLength()还大于0,抛出协议错误
   **/
  if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
    throw new ProtocolException(
        "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
  }
  return response;
}

CallServerInterceptor执行完成后返回的是一个只读取了响应headers,但是还没有读取body的Response,OkHttp网络请求部分的代码到此就结束了,后续的parseReponse都在更上层的框架中,比如Retrofit就是在OkHttpCall.parseReponse()方法中调用serviceMethod.toResponse(catchingBody)中调用GsonConvter或者其他Convertor来进行处理。

获取指标具体实现

对于Http 请求耗时,异常,数据大小,状态码 的获取,直接使用前面实现的MAOP,拦截OkHttpClient.Builder的build方法加入统计Interceptor ,DNSLookUp 耗时,连接耗时,ssl耗时,通过设置EventListener.Factory,可以直接收集。解析耗时需要拦截上层框架的parseReponse方法进行收集。
首包时间需要拦截OkHttp读请求数据的方法来实现,OKHttpClient 最终调用CallServerInterceptor,关键代码就是读取readResponseHeaders的时机。


image

MAOP 实现

使用前面提供的MAOP功能,在AOP配置文件中加入,拦截OkHttpClient的builder方法和Http1Codec的readHeaderLine方法和okhttp3.internal.http2.Http2Stream的takeResponseHeaders方法的配置

image
image

在拦截OkHttpClient的Builder的build()方法中加入统计Interceptor和EventListenerFactory

image

首包的时间通过:认为第一次读响应头返回时为首包时间,拦截okhttp3.internal.http1.Http1Code.readHeaderLine的方法和okhttp3.internal.http2.Http2Stream.takeResponseHeaders计算首包时间


image

Retrofit parse耗时收集

AOP配置文件中加入对retrofit2.OKHttp.parseResponse拦截的配置


20_32_45__09_01_2019.jpg

Method回掉中处理相关的数据


20_35_17__09_01_2019.jpg

综上,这个方案基本能实现Http基本指标的获取,但是有些细节还需完善,可以微信关注知识星球共同交流


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。