Okhttp4源码分析(一)

前言

文章会分为两部分介绍,这一主要篇介绍从发起请求到调用getResponseWithInterceptorChain方法为止。
从getResponseWithInterceptorChain方法里开始,就开启了okhttp中的责任链模式,这个之后的文章再去做分析

流程图

Okhttp的使用

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://wwww.baidu.com").build();
//同步请求
Response syncResponse = client.newCall(request).execute();
//异步请求
client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(@NotNull Call call, @NotNull IOException e) { }
    @Override
    public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { }
});

请求从OkhttpClient发起,调用OkhttpClient#newCall方法,传入Request请求参数,返回一个Call,实际返回的是RealCall,再调用RealCall的execute或enqueue发起请求

OkHttpClient

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher//调度器,每一个请求都会记录在里面,里面包含了一个三个队列和一个线程池。
  ...省略部分代码...
  @get:JvmName("minWebSocketMessageToCompress")//这里@get:JvmName的目的是为了兼容OKHTTP3,其实就是取了个别名
  val minWebSocketMessageToCompress: Long = builder.minWebSocketMessageToCompress
  val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase()
  constructor() : this(Builder())

OkHttpClient可以直接创建或者通过Builder方式创建。里面没什么代码,只是根据建造者模式传入的Builder进行赋值,初始化操作都是在Builder里进行
再来看Builder

  class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()//Dispatcher就是在这里初始化
    internal var connectionPool: ConnectionPool = ConnectionPool()
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    internal var retryOnConnectionFailure = true
    internal var authenticator: Authenticator = Authenticator.NONE
    internal var followRedirects = true
    internal var followSslRedirects = true
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    internal var cache: Cache? = null
    internal var dns: Dns = Dns.SYSTEM
    internal var proxy: Proxy? = null
    internal var proxySelector: ProxySelector? = null
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    internal var callTimeout = 0
    internal var connectTimeout = 10_000
    internal var readTimeout = 10_000
    internal var writeTimeout = 10_000
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    internal var routeDatabase: RouteDatabase? = null
}

Builder里面进行参数的初始化,像拦截器就是通过Builder设置,这里主要知道Dispatcher是在这里初始化的就行。

Request

  open class Builder {
    internal var url: HttpUrl? = null
    internal var method: String
    internal var headers: Headers.Builder
    internal var body: RequestBody? = null

    /** A mutable map of tags, or an immutable empty map if we don't have any. */
    internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()

    constructor() {
      this.method = "GET"
      this.headers = Headers.Builder()
    }
}

Request里默认请求方式为get,通过建造者模式配置请求地址、请求方式、请求头等参数。

RealCall

请求第一步是client.newCall(request),看看newCall是什么

  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

newCall返回了一个RealCall,RealCall是继承自Call接口
来看看Call接口定义了些什么方法

interface Call : Cloneable {
  fun request(): Request
  @Throws(IOException::class)
  fun execute(): Response//同步请求
  fun enqueue(responseCallback: Callback)//异步请求
  fun cancel()
  fun isExecuted(): Boolean
  fun isCanceled(): Boolean
  fun timeout(): Timeout
  public override fun clone(): Call

  fun interface Factory {//OkHttpClient实现了这个工厂接口用来创建RealCall
    fun newCall(request: Request): Call
  }
}

OK,发现里面定义了execute和enqueue两个请求方法,那么网络请求应该就是由client.newCall(request)构建一个RealCall,再由RealCall调用execute和enqueue发起请求了。

看看RealCall怎么实现的execute同步请求

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }//判断请求是否已经执行

    timeout.enter()//超时逻辑
    callStart()
    try {
      client.dispatcher.executed(this)//往dispatcher的同步队列里添加当前任务
      return getResponseWithInterceptorChain()//通过责任链进行网络请求等逻辑
    } finally {
      client.dispatcher.finished(this)
    }
  }

同步方法非常简单,往dispatcher里面的同步队列添加了一个任务,然后直接在当前线程调用getResponseWithInterceptorChain开始进行网络请求。

再来看看异步请求

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))////往dispatcher的同步队列里添加AsyncCall任务
  }

这次没有直接调用getResponseWithInterceptorChain方法,而是只调用了dispatcher.enqueue,并且传入的不是RealCall而是一个AsyncCall

//AsyncCall是RealCall的一个内部类
  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {

    //供外部调用,传入一个线程池,在线程池里执行这个任务
    fun executeOn(executorService: ExecutorService) {
      try {
        executorService.execute(this)
      } catch (e: RejectedExecutionException) {
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
      }
    }

    override fun run() {
        try {
          val response = getResponseWithInterceptorChain()//请求最终也是调用的这个方法
          //相关回调
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            responseCallback.onFailure(this@RealCall, e)
        } catch (t: Throwable) {
          responseCallback.onFailure(this@RealCall, canceledException)
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

上面是精简过的代码,AsyncCall是RealCall的一个内部类,实现了Runnable 接口。通过executeOn方法传入一个线程池执行this,由于AsyncCall是一个Runnable,所以会在子线程中执行run方法,run方法里调用getResponseWithInterceptorChain这个方法进行网络请求,拿到请求结果后回调responseCallback。

可以这样认为,RealCall代表一个同步请求,AsyncCall代表一个异步请求

由于AsyncCall是传入到了Dispatcher里面,所以AsyncCall#executeOn方法肯定是在Dispatcher里面调用的。
下面看看Dispatcher是怎么去执行executed和enqueue这两个方法的

Dispatcher

class Dispatcher constructor() {
  //#1
  @get:Synchronized var maxRequests = 64//最大请求数
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }

  @get:Synchronized var maxRequestsPerHost = 5//最大并发数
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }

  @set:Synchronized
  @get:Synchronized
  var idleCallback: Runnable? = null

  //这里定义了两个线程池executorServiceOrNull和executorService,实际上可以认为只定义了一个executorService
  //当调用executorService线程池时,会触发get()方法,里面会初始化executorServiceOrNull线程池并将其返回
  //对kotlin get() set()不了解的可以查看相关资料https://blog.csdn.net/c1392851600/article/details/80933130

  private var executorServiceOrNull: ExecutorService? = null

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  private val readyAsyncCalls = ArrayDeque<AsyncCall>()//准备执行的异步任务
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()//正在执行的异步任务
  private val runningSyncCalls = ArrayDeque<RealCall>()//正在执行的同步任务

  //#2
  //添加任务到同步队列
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
  //添加任务到异步等待队列
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  //执行异步任务
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()//新建一个列表,用来存放需要加入线程池的任务
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break //是否达到最大请求数
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // 是否达到最大并发数

        i.remove()//从等待队列中移除
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)//加入缓存列表
        runningAsyncCalls.add(asyncCall)//加入正在执行的队列
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {//遍历缓存列表
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)//执行异步请求
    }

    return isRunning
  }
}

代码看着很长,但是并不复杂。
#1-#2处的代码定义了线程池参数然后初始化了线程池,线程池只有异步任务会用到,还初始化了三个队列。
先来说三个队列,这三个队列的目的是为了对请求进行管理。
首先是两个异步的队列,由于线程池有上限,肯定不是所有任务都是正在执行,readyAsyncCalls就是用来放准备执行的异步请求,runningAsyncCalls 用来放正在执行的异步请求(已经添加到线程池当中的请求)。
同步任务都是同步执行的,也就不存在等待队列,故所有同步任务都是正在执行的任务,都放在runningSyncCalls 里面。这里注意下,同步任务并不是一定都在主线程开启,可能同时创建多个线程在这些线程里同时执行同步任务,所以同步任务也需要有一个队列来对请求进行管理。

executed方法往runningSyncCalls队列添加一个RealCall同步请求。
enqueue方法往readyAsyncCalls队列添加一个AsyncCall异步请求,然后调用promoteAndExecute方法判断是否执行异步任务
promoteAndExecute方法将缓存队列的请求加入到正在执行的队列中,然后调用asyncCall.executeOn(executorService)执行这些异步请求。

不论同步还是异步任务,最终都会调用getResponseWithInterceptorChain方法开始真正的网络请求

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容