Okhttp是一个处理网络请求的开源项目,是安卓端最火热的轻量级框架,由大名鼎鼎的Square公司贡献。
优势:
- 支持Http2并允许对同一主机的所有请求共享一个套接字
- 通过连接池,减少了请求延迟:复用socket连接,减少了3次招收4次挥手的耗时
- 默认通过Gzip压缩数据:数据量更小了
- 响应缓存,避免了重复请求的网络:重用缓存
- 请求失败自动重试主机的其他ip,自动重定向
使用方式
引入库:
implementation 'com.squareup.okhttp3:okhttp:3.2.0'
发起get请求:
分别创建一个OkHttpClient对象,一个Request对象,然后利用他们创建一个Call对象,最后调用同步请求execute()方法或者异步请求enqueue()方法来拿到Response。
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.readTimeout(20, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url("")
.build();
try {
Response response = okHttpClient.newCall(request).execute();
response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
发起post请求:
RequestBody requestBody = new FormBody.Builder()
.add("","")
.build();
Request request = new Request.Builder()
.url("")
.addHeader("","")
.post(requestBody)
.build();
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.readTimeout(20, TimeUnit.SECONDS)
.build();
try {
Response response = okHttpClient.newCall(request).execute();
response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
OkHttp整体流程图:

okhttp内部封装功能:
采用了建造者模式创建,可以让用户配置一些属性。
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
//调度器
internal var dispatcher: Dispatcher = Dispatcher()
//连接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整体流程拦截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//网络流程拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程监听器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//连接失败时是否重连
internal var retryOnConnectionFailure = true
//服务器认证设置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否从HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie设置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//缓存设置
internal var cache: Cache? = null
//DNS设置
internal var dns: Dns = Dns.SYSTEM
//代理设置
internal var proxy: Proxy? = null
//代理选择器设置
internal var proxySelector: ProxySelector? = null
//代理服务器认证设置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//协议
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校验
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//请求超时
internal var callTimeout = 0
//连接超时
internal var connectTimeout = 10_000
//读取超时
internal var readTimeout = 10_000
//写入超时
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
...
Call
一个Call对象表示一次请求,每一次请求都会生成一个新的Call,Call其实是一个接口,它的具体实现类是RealCall。
interface Call : Cloneable {
/** 返回发起此调用的原始请求 */
fun request(): Request
/**
* 同步请求,立即执行。
*
* 抛出两种异常:
* 1. 请求失败抛出IOException;
* 2. 如果在执行过一回的前提下再次执行抛出IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response
/**
* 异步请求,将请求安排在将来的某个时间点执行。
* 如果在执行过一回的前提下再次执行抛出IllegalStateException */
fun enqueue(responseCallback: Callback)
/** 取消请求。已经完成的请求不能被取消 */
fun cancel()
/** 是否已被执行 */
fun isExecuted(): Boolean
/** 是否被取消 */
fun isCanceled(): Boolean
/** 一个完整Call请求流程的超时时间配置,默认选自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** 克隆这个call,创建一个新的相同的Call */
public override fun clone(): Call
/** 利用工厂模式来让 OkHttpClient 来创建 Call对象 */
fun interface Factory {
fun newCall(request: Request): Call
}
}
在 OkHttpClient 中,我们利用 newCall 方法来创建一个 Call 对象,newCall 方法返回的是一个 RealCall 对象,需要传入一个Request对象,Request对象表示用户的请求参数。
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
RealCall是Call接口的具体实现类,是应用端与网络层的连接桥,展示应用端原始的请求与连接数据,以及网络层返回的response及其它数据流。
通过使用方法也可知,创建RealCall对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求 execute() 与异步请求 enqueue()方法。
Request
同样是请求参数的配置类,也同样采用了建造者模式。
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
//请求的URL
internal var url: HttpUrl? = null
//请求方法,如:GET、POST..
internal var method: String
//请求头
internal var headers: Headers.Builder
//请求体
internal var body: RequestBody? = null
...
Dispatcher
调度器,用来调度Call对象,同时包含线程池与异步请求队列。
看Dispatcher类部分代码:
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//创建一个缓存线程池,来处理请求调用
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 已准备好的异步请求队列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的异步请求队列, 包含取消但是还未finish的AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在运行的同步请求队列, 包含取消但是还未finish的RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代码···
}
执行请求流程
okhttp3中提供了两种请求方式:一种是同步请求,第二种是异步请求。同步请求调用call.execute()方法,异步请求调用call.enqueue(Callback callback)方法。
public interface Call {
/**立即调用请求,并阻塞,直到响应可以被处理或进入*/
Response execute() throws IOException;
/**
* 计划在将来某个时间执行的请求
*/
void enqueue(Callback responseCallback);
}
1.同步请求
调用调度器executed方法,就是将当前的RealCall对象加入到runningSyncCalls队列中,然后调用getResponseWithInterceptorChain方法拿到response。
override fun execute(): Response {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//请求超时开始计时
timeout.enter()
//开启请求监听
callStart()
try {
//调用调度器中的 executed() 方法,调度器只是将 call 加入到了runningSyncCalls队列中
client.dispatcher.executed(this)
//调用getResponseWithInterceptorChain 方法拿到 response
return getResponseWithInterceptorChain()
} finally {
//执行完毕,调度器将该 call 从 runningSyncCalls队列中移除
client.dispatcher.finished(this)
}
}
同步请求,直接将请求加入到了runningSyncCalls中,立即执行任务,执行完成将call从runningSyncCalls移除掉。
2.异步请求
override fun enqueue(responseCallback: Callback) {
//CAS判断是否已经被执行了, 确保只能执行一次,如果已经执行过,则抛出异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//开启请求监听
callStart()
//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
然后调用调度器的enqueue方法
internal fun enqueue(call: AsyncCall) {
//加锁,保证线程安全
synchronized(this) {
//将该请求调用加入到 readyAsyncCalls 队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
//通过域名来查找有没有相同域名的请求,有则复用。
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//执行请求
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
//判断是否有请求正在执行
val isRunning: Boolean
//加锁,保证线程安全
synchronized(this) {
//遍历 readyAsyncCalls 队列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls 的数量不能大于最大并发请求数 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同域名最大请求数5,同一个域名最多允许5条线程同时执行请求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//从 readyAsyncCalls 队列中移除,并加入到 executableCalls 及 runningAsyncCalls 队列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//通过运行队列中的请求数量来判断是否有请求正在执行
isRunning = runningCallsCount() > 0
}
//遍历可执行队列,调用线程池来执行AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
异步请求如何决定将请求放入ready还是running?
正在执行的请求小于maxRequests(64)个,并且与同一个服务器域名请求数小于maxRequestsPerHost(5)个,可加入到runningAsyncCalls,否则加入到readyAsyncCalls队列中。这里对客户端和服务器都做了压力上限。
把Dispatcher当成生产者,如果生产的线程小于可消费的范围,则立即加入消费队列,交个线程池执行任务;而当生产者生产的线程大于消费者所能承受的最大范围,就把未能及时执行的任务保存在readyAsyncCalls队列中。(AsyncCall是一个runnable)
最终,请求结束,调用调度器finish方法
/** 异步请求调用结束方法 */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** 同步请求调用结束方法 */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将当前请求调用从 正在运行队列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//继续执行剩余请求,将call从readyAsyncCalls中取出加入到runningAsyncCalls,然后执行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
//如果执行完了所有请求,处于闲置状态,调用闲置回调方法
idleCallback.run()
}
}
promoteAndExecute方法会用相同判断方法判断是否需要将call从readyAsyncCalls中转移到runningAsyncCalls队列中。
获取Response
接着就是看看getResponseWithInterceptorChain方法是如何拿到response的。
采用了责任链设计模式,通过拦截器构建了以RealInterceptorChain责任链,然后执行proceed方法来得到response。
继续下一篇:
参考文章
okhttp3源码分析:架构全面解析
OkHttp源码走心解析
面试灵魂拷问:
-
Okhttp连接池和连接复用是怎么实现的
频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)是非常消耗网络资源和浪费时间的,HTTP中的keepalive连接对于 降低延迟和提升速度有非常重要的作用。
复用连接就需要对连接进行管理,这里就引入了连接池的概念。
在BridgeInterceptor的intercept()方法中requestBuilder.header("Connection", "Keep-Alive"),我们在request的请求头添加了("Connection", "Keep-Alive")的键值对,这样就能够保持长连接。而连接池ConnectionPoll就是专门负责管理这些长连接的类。需要注意的是,我们在初始化 ConnectionPoll的时候,会设置 闲置的connections 的最大数量为5个,每个最长的存活时间为5分钟。
1)首先,ConectionPool中维护了一个双端队列Deque,也就是两端都可以进出的队列,用来存储连接。
2)然后在ConnectInterceptor,也就是负责建立连接的拦截器中,首先会找可用连接,也就是从连接池中去获取连接,具体的就是会调用到ConectionPool的get方法。
3)如果没找到可用连接,就会创建新连接,并会把这个建立的连接加入到双端队列中,同时开始运行线程池中的线程,其实就是调用了ConectionPool的put方法。
public final class ConnectionPool {
void put(RealConnection connection) {
if (!cleanupRunning) {
//没有连接的时候调用
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
}
-
OkHttp里面用到了什么设计模式
责任链模式
OkHttp中最直接的责任链模式的使用就是Interceptor的使用。书写简单漂亮,使用也非常方便,只需要OkHttpClient.Builder调用addInterceptor()方法,将实现了Interceptor接口的类添加进去即可,扩展性和可定制化都非常方便。
OkHttpClient httpClient = new OkHttpClient.Builder()
.addInterceptor(new HeaderInterceptor())
.addInterceptor(new LogInterceptor())
.addInterceptor(new HttpLoggingInterceptor(logger)
.build();
建造者模式
OkHttp中最直接的建造者模式的使用就是XXBuilder的使用。在OkHttp中的OkHttpClient、Request、Response、HttpUrl、Headers、MultipartBody等大量使用了类似的建造者模式。
工厂模式
Call的创建通过Factory来创建。
OkHttp中的工厂模式的使用有CacheStrategy.Factory,这是一个简单工厂模式,主要也是用于生成一个CacheStrategy对象。
参考:
https://blog.csdn.net/gaolh89/article/details/104339688
okhttp线程池特点
Okhttp线程池创建代码:
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。且不会触发线程池的拒绝策略。