本文希望你在读完之后能够清楚的事情:一次http请求的经历,这期间会遇到什么问题,okhttp怎么解决的,在它的责任链中的那一部分解决的这个问题,怎样监控我们自己的网络请求,怎样监控网络状况。
一次http网络请求的历程
网络请求要依次经历 DNS解析、创建连接、收发数据、关闭连接几个过程。下图是其他教程里的一张图,画的非常清晰:这期间需要应对的问题有:
1、DNS劫持。即使我们现在几万用户的小体量app,每月也能碰到几起DNS劫持。除了DNS劫持,这部分还需要考虑IP选取策略、DNS缓存、容灾等问题,如果必要的话,可以对其进行优化,参考百度的DNS优化,以及美图案例(考虑了非okhttp的情况)。
2、连接复用。http基于TCP,所以连接要经历三次握手,关闭连接要经历4次握手,TCP连接在发送数据的时候,在起初会限制连接速度,随着传输的成功和时间的推移逐渐提高速度(防拥塞),再加上TLS密匙协商,如果每次网络请求都要经历创建连接的过程,带来的开销是非常大的。
3、I/O问题。客户端会等待服务器的返回数据,数据收到后还要把数据从内核copy到用户空间,期间根据网络的阻塞模型(基本有五种,常见的有阻塞I/O、非阻塞I/O、多路复用I/O),会遇到不同程度的阻塞。
4、数据压缩和加密。减少数据体积,对数据进行加密。
对于上述问题的方案:
1、okhttp提供了自定义DNS解析的接口。
2、持久连接。http1.1支持事务结束之后将TCP保持在打开状态(http1.1默认将Keep-Alive首部开启,用于客户端和服务器通信连接的保存时间,TCP中有Keep-Alive报文,来定时探测通信双方是否存活,但是这一部分内容用于长连接时会存在问题),对http1.1进行连接复用,将连接放入连接池。支持http2.0,http2.0使用多路复用,支持一条连接同时处理多个请求,请求可以并发进行,一个域名会保留一条连接(一条连接即一个TCP连接,收发只有一根管道,并不是真正意义上的并发,而是利用TCP把数据拆分装包加标签的特性实现的复用),能有效降低延时(也有特殊情况,比如一个域名的数据请求特别多,或者服务端对单个连接有速度限制,如视频流)。长连接(推荐读下这篇文章,如果没有从事过长连接开发的话)也是一种方案,在某些场景下非常有效,但是okhttp不支持。
3、OKhttp使用非阻塞I/O模型OKio(算是nio吧,和我们理解的nio不太一致,理解的nio定时去检查是否有数据到来,有的话就读,没有就返回,但是okio的实现是定时去检查是否已经读写完成,没完成就认为超时,close掉该socket),该I/O框架的内存表现也很好(mars使用epoll)。
4、http2.0协议本身对头部有压缩。对于body的压缩okhttp提供了Gzip压缩的支持。
OKhttp实现分析
网上找到的一个okhttp整体调用图,由于okhttp的分析已经很多,尽量以少代码多总结的方式来阐述这部分内容。主要讲解okhttp应对上述问题的具体实现。
设计结构十分清晰,通过责任链将请求发送任务进行拆解。
1、okhttp中自定义DNS解析
OkHttpClient client = new OkHttpClient.Builder()
.dns(new Dns(){
@Override
public List<InetAddress> lookup(String hostname) throws UnknownHostException {
return Arrays.asList(InetAddress.getAllByName(DNSHelper.getIpByHost(hostname)));
}
})
.build();
DNSHelper可以通过ip直连的方式访问自己设置的DNS服务器。也可以通过这种方式接入一些第三方对外提供的DNS服务。
在RetryAndFollowUpInterceptor这个拦截器中,将会创建Address对象,该对象收集网络请求需要的配置信息,包括DNS、host、port、proxy等。在ConnectIntercepter这一层创建了RouteSelecter,用于路由选择,持有Address对象,调用其next函数选择路由时会调用DNS对象的lookup函数返回host的ip。
2、连接复用
这部分需要操心的事情一个是连接的管理,一个是对网络请求的流的管理。连接的管理交由ConnectionPool,内部含有一个保存了RealConnection对象的队列。http2.0一个连接对应多个流,在RealConnection内保存了一个代表流的StreamAllocation对象的list。
在ConnectIntercepter这一层调用StreamAllocation的newStrem,尝试在连接池里找到一个RealConnection,没找到则创建一个,并调用acquire添加一个自身的弱引用到RealConnection的流引用List中。newStrem最终返回一个httpcodec接口的实现,代表了具体的http协议内容创建规则,有两种实现,对应了okhttp适配的两个http版本,然后传递给下一级。当然流在读写完成后也是需要被清理的,清理函数deallocate,一个连接的流都被清理掉之后,通知ConnectionPool判断连接的kepp-alive时间,以及空闲连接数量,移除超时或者超出数量限制后空闲时间最久的连接。
如下是调用流和连接的绑定过程(省略了路由选择过程),新创建的连接会执行socket的connect,connect的时候会判断http协议是哪个版本,然后新创建的RealConnection会添加到连接池里。
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
如下可以看出连接池对空闲时间和空闲连接数量的限制(顺带一提,okhttp的线程池也是有数量限制的,大约在60个左右,如果项目网络库比较乱,使用线程也不太注意,线程过多,超过500个,在一些华为手机上会因为申请不到线程而崩溃)。
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
如下是具体的清理逻辑:
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
3、I/O优化
a、okio的使用:
Okio.buffer(Okio.sink(socket)) .writeUtf8("write string by utf-8.\n") .writeInt(1234).close();
b、okio没有使用java提供的select(多路复用),而是自定义了nio实现。个人猜测这样实现的原因是多路复用实际在网络连接非常多的时候表现更好,对于客户端来讲不一定适用,反倒会增加大量的select/epoll系统调用,更多用于服务器。
设置一个watchdog,将一次事件(读、写)封装到AsyncTimeout中,AsyncTimeout持有一个static链表,Watchdog定期检测链表。
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
awaitTimeout()函数读取链表,设置等待事件。到时间后,返回链表中的一个AsyncTimeout 对象,并调用该对象的timedOut()函数。
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
而若这个事件已经完成则会调用exit()函数,将该事件在队列中移除。
/** Returns true if the timeout occurred. */
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
/** Returns true if the timeout occurred. */
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
超时后认为连接不可用,调用Sockect对象的close函数关闭该连接。
c、缓存
是okio对java I/O 做的最重要的优化。主要思想是buffer复用,而不是创建大量的朝生夕死的buffer对象,防止频繁GC。这部分内容可以对比BufferedInputStream的实现(BufferedInputStream内部结构和Segment类似,当其设置的初始缓存byte数组大小不够时,新申请一个更大容量的数组,并将原缓存数组的内容copy过来,舍弃原数组)。
代码思路:Segment对象为byte数组的封装,是数据的容器,是一个双向链表中的节点,可以有插入、删除、拆分、合并、复制几个操作。SegmentPool缓存了不用的segment,是一个静态的单链表,需要时调用take获取Segment,不需要时调用recycle回收。Buffer对象封装了这两者的使用,例如使用okio调用writeString函数时的实现如下:
@Override
public Buffer writeString(String string, int beginIndex, int endIndex, Charset charset) {
if (string == null) throw new IllegalArgumentException("string == null");
if (beginIndex < 0) throw new IllegalAccessError("beginIndex < 0: " + beginIndex);
if (endIndex < beginIndex) {
throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
}
if (endIndex > string.length()) {
throw new IllegalArgumentException(
"endIndex > string.length: " + endIndex + " > " + string.length());
}
if (charset == null) throw new IllegalArgumentException("charset == null");
if (charset.equals(Util.UTF_8)) return writeUtf8(string, beginIndex, endIndex);
byte[] data = string.substring(beginIndex, endIndex).getBytes(charset);
return write(data, 0, data.length);
}
@Override public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
checkOffsetAndCount(source.length, offset, byteCount);
int limit = offset + byteCount;
while (offset < limit) {
Segment tail = writableSegment(1);
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);
offset += toCopy;
tail.limit += toCopy;
}
size += byteCount;
return this;
}
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
核心逻辑是缓存大小的管理,然后调用System.arraycopy将数据复制到容器中。
4、数据的压缩加密
对这一部分的处理主要在BridgeInterceptor中。会在头部自动添加Accept-Encoding: gzip,并自动对response的进行解压缩,若手动添加了,则不处理response的数据。
对于发送数据的body,官方推荐自定义拦截器实现。拦截器内选用Gzip或者其他的压缩算法对数据进行压缩。
除了单纯的压缩,使用protobuffer代替json也是一种选择,除了压缩率和速度,protobuffer对数据是一种天然的混淆,更安全一些,但是使用起来比json要麻烦。
同样的手段,也可以插入一个自定义的拦截器来对数据进行加密。
网络请求的监控
okhttp在3.11版本开始提供了一个网络时间监控的回调接口HttpEventListener,能进行一些耗时和事件统计。
360的方案加入拦截器统计响应时间和上下行流量。
网络质量的监控
okhttp没有这部分内容,但是有一些工具可以用,可以执行linux的ping命令,在socket连接前后加入计时,使用tracerout(利用ICMP协议来查看到目标机器链路中的节点的可达性,其报文内会含有目标网络、主机、端口可达性等一系列信息,再加上ip协议的TTL来遍历当前节点到目标节点的链路信息),程序实现参考《traceroute程序-c语言实现》
弱网优化和失败处理
这部分就留坑吧。okhttp对网络失败做了处理,但是说到针对弱网的优化,还是要去翻看mars。