庖丁解牛:
okhttpclient,okhttp的工厂类,用于创建calls(用于创建请求和返回数据。)
默认构造函数,是new了个Builder。
public OkHttpClient() {
this(new Builder());
}
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
}
sslSocketFactory和certificateChainCleaner 用于ssl通讯加解密,如果builder未指定,则使用系统默认的。后面再看。
这一步,注意点:
1)this.interceptors,和this.networkInterceptors在builder创建之后就不不可变了。没理解原由!!!
按理说,初始化client都在builder类中创建,创建好后,都不可修改。为什么只是拦截器不可修改。
2) client的关键成员变量都是builder生成。
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
下面轮流解释builder类里成员变量的作用。
1. Dispatcher:异步请求执行策略。本质是ExecutorService::run calls.
如果是自定义的executor,应该正确的调用getMaxRequests。
(ExecutorService)见{}
注:
- private int maxRequests = 64;// 默认线请求数
- private int maxRequestsPerHost = 5; //每个主机最多请求5
3。默认 核心ThreadPoolExecutor,默认空闲线程为0,最大线程数Integer.MAX_VALUE,空闲线程等待时间60秒。队列是SynchronousQueue。
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//包括取消未结束的异步任务。
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
SynchronousQueue特点:
1.一个线程插入操作insert,必须等待另一个线程remove。
一个线程remove,必须等待另一个线程insert。
2.不能peek
3.不能iterate
4.不允许空元素
5.类似于CSP(内存安全策略)
ArrayDeque特点:
1)Resizable-array
2)This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.
3)线程不安全。
先看下RealCall中的执行方法:
- 同步方法,execute()
2.异步方法,enqueue()
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
1.同步执行逻辑(表示是当前线程):添加任务(RealCall)到runningSyncCalls,并在fianlly调用Dispatcher的finished().如果
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
2.异步执行逻辑:最终调用的Dispatcher中的enqueue,如果小于总的请求数并且小于maxRequestsPerHost,则立即执行;否则,添加到等待队列readyAsyncCalls。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
默认协议:HTTP_1_1("http/1.1"),HTTP_2("h2")
static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(
Protocol.HTTP_2, Protocol.HTTP_1_1)
public enum Protocol {
/**
* An obsolete plaintext framing that does not use persistent sockets by default.
*/
HTTP_1_0("http/1.0"),
/**
* A plaintext framing that includes persistent connections.
*
* <p>This version of OkHttp implements <a href="http://www.ietf.org/rfc/rfc2616.txt">RFC
* 2616</a>, and tracks revisions to that spec.
*/
HTTP_1_1("http/1.1"),
/**
* Chromium's binary-framed protocol that includes header compression, multiplexing multiple
* requests on the same socket, and server-push. HTTP/1.1 semantics are layered on SPDY/3.
*
* <p>Current versions of OkHttp do not support this protocol.
*
* @deprecated OkHttp has dropped support for SPDY. Prefer {@link #HTTP_2}.
*/
SPDY_3("spdy/3.1"),
/**
* The IETF's binary-framed protocol that includes header compression, multiplexing multiple
* requests on the same socket, and server-push. HTTP/1.1 semantics are layered on HTTP/2.
*
* <p>HTTP/2 requires deployments of HTTP/2 that use TLS 1.2 support {@linkplain
* CipherSuite#TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256} , present in Java 8+ and Android 5+. Servers
* that enforce this may send an exception message including the string {@code
* INADEQUATE_SECURITY}.
*/
HTTP_2("h2");
private final String protocol;
Protocol(String protocol) {
this.protocol = protocol;
}
默认连接说明:支持非加密,和tls版本的加密套件
浏览器查看是否支持tsl1.3:(访问https://tls13.crypto.mozilla.org/)
static final List<ConnectionSpec> DEFAULT_CONNECTION_SPECS = Util.immutableList(
ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT);
/** A modern TLS connection with extensions like SNI and ALPN available. */
public static final ConnectionSpec MODERN_TLS = new Builder(true)
.cipherSuites(APPROVED_CIPHER_SUITES)
.tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.supportsTlsExtensions(true)
.build();
/** Unencrypted, unauthenticated connections for {@code http:} URLs. */
public static final ConnectionSpec CLEARTEXT = new Builder(false).build();
private static final CipherSuite[] APPROVED_CIPHER_SUITES = new CipherSuite[] {
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
CipherSuite.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256,
CipherSuite.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
// Note that the following cipher suites are all on HTTP/2's bad cipher suites list. We'll
// continue to include them until better suites are commonly available. For example, none
// of the better cipher suites listed above shipped with Android 4.4 or Java 7.
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_128_GCM_SHA256,
CipherSuite.TLS_RSA_WITH_AES_256_GCM_SHA384,
CipherSuite.TLS_RSA_WITH_AES_128_CBC_SHA,
CipherSuite.TLS_RSA_WITH_AES_256_CBC_SHA,
CipherSuite.TLS_RSA_WITH_3DES_EDE_CBC_SHA,
};
关于密码套件说明:参考https://kb.cnblogs.com/page/530044/
1.一般情况下,浏览器会按顺序遍历服务端支持的算法,如果发现客户端支持就立即返回。比如:客户端 CipherSuite(c1,c2,c3),服务端(c4,c2,c1),最终协商的结果是按照c2来加解密。所以,一般情况下,服务端会将比叫强的加密算法放在前面。
- CipherSuite.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA解释:
TLS:代表支持的协议
ECDHE_RSA:作为密钥交换算法
AES_256:加密算法,密钥和初始向量的长度都是256
CBC:填充方式
SHA:MAC算法是hash算法
注:每种Cipher的名字里包含了四部分信息,分别是:
- 密钥交换算法,用于决定客户端与服务器之间在握手的过程中如何认证,用到的算法包括RSA,Diffie-Hellman,ECDH,PSK等
- 加密算法,用于加密消息流,该名称后通常会带有两个数字,分别表示密钥的长度和初始向量的长度,比如DES 56/56, RC2 56/128, RC4 128/128, AES 128/128, AES 256/256
- 报文认证信息码(MAC)算法,用于创建报文摘要,确保消息的完整性(没有被篡改),算法包括MD5,SHA等。
- PRF(伪随机数函数),用于生成“master secret”。
aes 5种加密模式:
1.电码本模式(Electronic Codebook Book (ECB));
2.密码分组链接模式(Cipher Block Chaining (CBC));
3.计算器模式(Counter (CTR));
4.密码反馈模式(Cipher FeedBack (CFB));
5.输出反馈模式(Output FeedBack (OFB))。
eventListenerFactory默认创建空的事件监听器:EventListener.NONE
代理默认jdk系统级别的:ProxySelector.getDefault(),
public static ProxySelector getDefault() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SecurityConstants.GET_PROXYSELECTOR_PERMISSION);
}
return theProxySelector;
}
//默认通过反射获取sun.net.spi.DefaultProxySelector
//theProxySelector为在连接到由URL引用的远程对象时,选择要使用的代理服务器(如果有)的系统范围代理选择器。异常就为null。
static {
try {
Class<?> c = Class.forName("sun.net.spi.DefaultProxySelector");
if (c != null && ProxySelector.class.isAssignableFrom(c)) {
theProxySelector = (ProxySelector) c.newInstance();
}
} catch (Exception e) {
theProxySelector = null;
}
}
默认cookies为null,自己实现需要复写saveFromResponse和loadForRequest方法。
CookieJar NO_COOKIES = new CookieJar() {
@Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
}
@Override public List<Cookie> loadForRequest(HttpUrl url) {
return Collections.emptyList();
}
};
系统默认java.net.socket 默认工厂不了解策略,例如通过防火墙(例如SOCKS v4或v5)或通过防火墙(例如使用SSL)进行隧道挖掘,或者某些端口保留用于SSL。
// The default factory has NO intelligence about policies like tunneling
// out through firewalls (e.g. SOCKS V4 or V5) or in through them
// (e.g. using SSL), or that some ports are reserved for use with SSL.
//
// Note that at least JDK 1.1 has a low level "plainSocketImpl" that
// knows about SOCKS V4 tunneling, so this isn't a totally bogus default.
//
// ALSO: we may want to expose this class somewhere so other folk
// can reuse it, particularly if we start to add highly useful features
// such as ability to set connect timeouts.
class DefaultSocketFactory extends SocketFactory {
public Socket createSocket() {
return new Socket();
}
public Socket createSocket(String host, int port)
throws IOException, UnknownHostException
{
return new Socket(host, port);
}
public Socket createSocket(InetAddress address, int port)
throws IOException
{
return new Socket(address, port);
}
public Socket createSocket(String host, int port,
InetAddress clientAddress, int clientPort)
throws IOException, UnknownHostException
{
return new Socket(host, port, clientAddress, clientPort);
}
public Socket createSocket(InetAddress address, int port,
InetAddress clientAddress, int clientPort)
throws IOException
{
return new Socket(address, port, clientAddress, clientPort);
}
}
主机校验 OkHostnameVerifier.INSTANCE,之后再看。
CertificatePinner默认CertificatePinner .DEFAULT
约束哪些证书可信。固定证书可以抵御对证书颁发机构的攻击。它还防止通过中间人证书颁发机构(应用程序用户已知或未知)进行连接.
固定公钥的方式比固定证书要好,以防证书变动。
CertificatePinner目前使用的就是固定公钥的方式
* <p>{@link CertificatePinner} can not be used to pin self-signed certificate if such certificate
* is not accepted by {@link javax.net.ssl.TrustManager}.
https://www.imperialviolet.org/2011/05/04/pinning.html
https://tools.ietf.org/html/rfc7469(HTTP Public Key Pinning (HPKP))
https://src.chromium.org/viewvc/chrome/trunk/src/net/http/transport_security_state_static.certs(sha1)
https://www.owasp.org/index.php/Certificate_and_Public_Key_Pinning(开放式Web应用程序安全项目owasp)
认证器
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
ConnectionPool连接池
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
默认dns
Lookup all addresses for {@code hostname} on the given {@code netId}.
如:InetAddress.getAllByName(“baidu.com”)获取item格式为:
baidu.com/123.125.115.110
baidu.com/220.181.57.216
Dns SYSTEM = new Dns() {
@Override public List<InetAddress> lookup(String hostname) throws UnknownHostException {
if (hostname == null) throw new UnknownHostException("hostname == null");
return Arrays.asList(InetAddress.getAllByName(hostname));
}
};
static final InetAddressImpl impl = new Inet6AddressImpl();
public static InetAddress[] getAllByName(String host)
throws UnknownHostException {
return impl.lookupAllHostAddr(host, NETID_UNSET).clone();
}
@Override
public InetAddress[] lookupAllHostAddr(String host, int netId) throws UnknownHostException {
if (host == null || host.isEmpty()) {
// Android-changed: Return both the Inet4 and Inet6 loopback addresses
// when host == null or empty.
return loopbackAddresses();
}
//Inet6AddressImpl
// Is it a numeric address?
InetAddress result = InetAddress.parseNumericAddressNoThrow(host);
if (result != null) {
result = InetAddress.disallowDeprecatedFormats(host, result);
if (result == null) {
throw new UnknownHostException("Deprecated IPv4 address format: " + host);
}
return new InetAddress[] { result };
}
return lookupHostByName(host, netId);
}
重定向默认true
followSslRedirects=true;
followRedirects = true;
失败重联
retryOnConnectionFailure = true;
连接超时10秒,读超时10秒,写10秒,ping间隔0
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
分析了工厂类okhttphttpclient,下面看调用:
(支持websocket)
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
/**
* Uses {@code request} to connect a new web socket.
*/
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
webSocket.connect(this);
return webSocket;
}
先看RealCall如何执行请求
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
this.eventListener = eventListenerFactory.create(this);
}
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
先看下这个是啥?captureCallStackTrace();
static CloseGuard get() {
Method getMethod;
Method openMethod;
Method warnIfOpenMethod;
try {
Class<?> closeGuardClass = Class.forName("dalvik.system.CloseGuard");
getMethod = closeGuardClass.getMethod("get");
openMethod = closeGuardClass.getMethod("open", String.class);
warnIfOpenMethod = closeGuardClass.getMethod("warnIfOpen");
} catch (Exception ignored) {
getMethod = null;
openMethod = null;
warnIfOpenMethod = null;
}
return new CloseGuard(getMethod, openMethod, warnIfOpenMethod);
}
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
最终调用的是“dalvik.system.CloseGuard”类的get(null).open("response.body().close()")
@Override public Object getStackTraceForCloseable(String closer) {
return closeGuard.createAndOpen(closer);
}
CloseGurad:提供了一种机制或者说是一个工具类,用来记录资源泄露的场景,比如使用完的资源(比如cursor/fd)没有正常关闭。
接入CloseGurad之后,如果发生资源使用后没有正常关闭,会在finalize方法中触发CloseGuard的warnIfOpen方法。
也就是说RealCall中执行请求之前,会将生成的callStackTrace放入retryAndFollowUpInterceptor中。猜测:可能用于之后被CloseGurad监测,如果有资源没关,就报错:“response.body().close()”。(CloseGurad原理下回分解)
异步的请求被添加到okhttpclient的线程池中。与同步的区别是:会判断retryAndFollowUpInterceptor.isCanceled。其他的都会调用精彩的getResponseWithInterceptorChain,和 client.dispatcher().finished(this)。
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
先看下NamedRunnable:用于修改线程名称,默认会设置关于url的线程名称,当线程任务执行完成,将线程名称恢复成老的线程名称。猜测:估计用于线程错误的时候,可以定位到错误的位置。
重点:拦截器,责任链设计模式,也是真正交易执行的方法入口。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
- interceptors.addAll(client.interceptors());
默认为没有任何拦截器的ArrayList,当然可以添加自己的interceptors. - interceptors.add(retryAndFollowUpInterceptor);
//RealCall 构造函数中
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
3.interceptors.add(new BridgeInterceptor(client.cookieJar()));
操作cookie的BridgeInterceptor,默认没有实现cookieJar
- interceptors.add(new CacheInterceptor(client.internalCache()));
缓存拦截器,CacheInterceptor
5.interceptors.add(new ConnectInterceptor(client));
添加连接拦截器
6.interceptors.addAll(client.networkInterceptors());
如果不是websocket,工厂类okhttpclient添加所有的网络拦截器。默认无 - interceptors.add(new CallServerInterceptor(forWebSocket));
最后一个拦截器,It makes a network call to the server. - RealInterceptorChain.proceed(originalRequest);
每次执行创建一个RealInterceptorChain并开始一连串的责任链掉用。
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
上面的代码核心的内容:(后面用于拦截器循环调用)
1)new一个新的RealInterceptorChain,将拦截器集合index序号加1
2)从index=0开始,依此获取拦截器集合中的各个拦截器。
3)通过集合中当前第index个拦截器,开始拦截下一个RealInterceptorChain。
由之前的代码可知:index=0的拦截器是:client.interceptors,
如果没有定义client的interceptors,则index=0是retryAndFollowUpInterceptor
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
RetryAndFollowUpInterceptor做的事情如下:
1.创建StreamAllocation,他的作用是用来协调Connections,Streams,Calls
Connections:如果physical socket connections建立比较慢,就需要cancle (取消)connection。
Streams:HTTP/1.x 一个Connection一个流,HTTP/2一个Connection多个流。
Calls:逻辑上的流序列。最好是多个流共用一个Connection
2.循环 将request, streamAllocation传入下一个新的RealInterceptorChain,并用index+1(如果前面是0,这里是1)的拦截器调用当前RealInterceptorChain。
并在所有的拦截器执行完之后,获取其他chain的response,再调用followUpRequest,获取response。
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
private Request followUpRequest(Response userResponse) throws IOException {
if (userResponse == null) throw new IllegalStateException();
Connection connection = streamAllocation.connection();
Route route = connection != null
? connection.route()
: null;
int responseCode = userResponse.code();
final String method = userResponse.request().method();
switch (responseCode) {
case HTTP_PROXY_AUTH:
Proxy selectedProxy = route != null
? route.proxy()
: client.proxy();
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
return client.proxyAuthenticator().authenticate(route, userResponse);
case HTTP_UNAUTHORIZED:
return client.authenticator().authenticate(route, userResponse);
case HTTP_PERM_REDIRECT:
case HTTP_TEMP_REDIRECT:
// "If the 307 or 308 status code is received in response to a request other than GET
// or HEAD, the user agent MUST NOT automatically redirect the request"
if (!method.equals("GET") && !method.equals("HEAD")) {
return null;
}
// fall-through
case HTTP_MULT_CHOICE:
case HTTP_MOVED_PERM:
case HTTP_MOVED_TEMP:
case HTTP_SEE_OTHER:
// Does the client allow redirects?
if (!client.followRedirects()) return null;
String location = userResponse.header("Location");
if (location == null) return null;
HttpUrl url = userResponse.request().url().resolve(location);
// Don't follow redirects to unsupported protocols.
if (url == null) return null;
// If configured, don't follow redirects between SSL and non-SSL.
boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
if (!sameScheme && !client.followSslRedirects()) return null;
// Most redirects don't include a request body.
Request.Builder requestBuilder = userResponse.request().newBuilder();
if (HttpMethod.permitsRequestBody(method)) {
final boolean maintainBody = HttpMethod.redirectsWithBody(method);
if (HttpMethod.redirectsToGet(method)) {
requestBuilder.method("GET", null);
} else {
RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
requestBuilder.method(method, requestBody);
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding");
requestBuilder.removeHeader("Content-Length");
requestBuilder.removeHeader("Content-Type");
}
}
// When redirecting across hosts, drop all authentication headers. This
// is potentially annoying to the application layer since they have no
// way to retain them.
if (!sameConnection(userResponse, url)) {
requestBuilder.removeHeader("Authorization");
}
return requestBuilder.url(url).build();
case HTTP_CLIENT_TIMEOUT:
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
return null;
}
return userResponse.request();
default:
return null;
}
}
除了以下状态吗需要特殊处理
/**
* HTTP Status-Code 407: Proxy Authentication Required.
*/
public static final int HTTP_PROXY_AUTH = 407;
/**
* HTTP Status-Code 401: Unauthorized.
*/
public static final int HTTP_UNAUTHORIZED = 401;
/** Numeric status code, 307: Temporary Redirect. */
public static final int HTTP_TEMP_REDIRECT = 307;
public static final int HTTP_PERM_REDIRECT = 308;
/**
* HTTP Status-Code 301: Moved Permanently.
*/
public static final int HTTP_MOVED_PERM = 301;
/**
* HTTP Status-Code 302: Temporary Redirect.
*/
public static final int HTTP_MOVED_TEMP = 302;
/**
* HTTP Status-Code 303: See Other.
*/
public static final int HTTP_SEE_OTHER = 303;
/**
* HTTP Status-Code 408: Request Time-Out.
*/
public static final int HTTP_CLIENT_TIMEOUT = 408;
接着看下一个拦截器BridgeInterceptor
注:请求头Range了解下。
参考:https://blog.csdn.net/hj520wj/article/details/46473921
//先添加head。body的Content-Type、Accept-Encoding、Cookie 等,接下来..
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
等待下一个拦截器获取的Response,报文头以及body的补充处理。
CacheInterceptor 设置缓存策略,关键代码如下:
cacheCandidate,可以自定义
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
....
//如果if (networkRequest == null && cacheResponse == null) ,设置codestate=504,并返回Response,接下来...
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
networkResponse = chain.proceed(networkRequest);
//如果是304状态,有缓存,更新完Response的head等就直接返回,接下来...
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
注:only-if-cached等状态 参考:https://blog.csdn.net/meimeizhuzhuhua/article/details/70665241
默认是没有cache。用缓存的话,参考下面的案例
* public final OkHttpClient client = new OkHttpClient.Builder()
* .addInterceptor(new HttpLoggingInterceptor())
* .cache(new Cache(cacheDir, cacheSize))
* .build();
ConnectInterceptor
看重点方法
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
上面创建连接,和HttpCodec,接下来将他们传入新的RealInterceptorChain,并用CallServerInterceptor拦截,如果是非websocket,并且有自定义的networkInterceptors,则先用networkInterceptors拦截。
先看httpcodec
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
这里支持http1和http2,先看http1的方法:writeRequestHeaders(request),如果有body,createRequestBody 最终调用如下:
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
@Override public Sink createRequestBody(Request request, long contentLength) {
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
// Stream a request body of unknown length.
//分块编码,举例见:https://www.cnblogs.com/xuehaoyue/p/6639029.html
return newChunkedSink();
}
if (contentLength != -1) {
// Stream a request body of a known length.
return newFixedLengthSink(contentLength);
}
throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}
说白了,如果请求带body,如果知道定长就用newFixedLengthSink,如果不知道定长,请求头带Transfer-Encoding,则用newChunkedSink.
//POST的时候, 当要POST的数据大于1024字节的时候, curl并不会直接就发起POST请求, 而是会分为2个步骤:
//1.发送一个请求, 包含一个Expect:100-continue, 询问Server使用愿意接受数据
//2.接收到Server返回的100-continue应答以后, 才把数据POST给Server
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
结束sink
@Override public void finishRequest() throws IOException {
sink.flush();
}
上面是有body的情况,如果是get请求,
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
接着创建Response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
最后处理响应码等。
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}