OkHttp 4.0 Kotlin源码分析 (三) Dispatcher分发器流程控制

前言

随着OkHttp 4.0 版本的发布,从这一节起我们将以OkHttp 4.0 为分析素材。 官方介绍说一切都变了,一切又都没变,说人话就是从4.0开始Okhttp从java迁移到了Kotlin。 Okhttp的迁移也激发了更多android开发者向Kotlin转换的信心。

implementation("com.squareup.okhttp3:okhttp:4.4.0")

其他相关文章

OkHttp 4.0 Kotlin源码分析 (一) 同步和异步请求基本用法

OkHttp 4.0 Kotlin源码分析 (二) 基本的数据对象以及Call类分析

OkHttp 4.0 Kotlin源码分析 (三) Dispatcher分发器流程控制

Dispatcher 调度

前面章节介绍了,OKhttp的基本用法和同步异步请求流程的区别,其中涉及到一个很重要的概念叫做Dispatcher。 通过前面的文章我们可以知道Dispatch完成了对同步和异步请求的分发和调度。 那么这个Dispather 是怎么完成这些工作的呢?

同步和异步请求结构

先看张图,让我们逐步分析首先我们看下同步请求时候Dispatcher做了些什么?

Dispatcher同步请求的调度

我们知道在执行同步请求的时候,实际执行的的是RealCall中的execute方法其中有9、10、12三行非常关键的代码。。

  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

同步请求非常简单,我们看到在同步请求中,我们知道getResponseWithInterceptorChain()是真正的网络请求执行单元并且使用了责任链模式实现的拦截器。 前后分别执行了Dispatcher的 executed 和 finish方法。 我们可以猜想,除了执行请求还需要做些什么呢?

没错还需要维护请求状态,因为我们知道我们的请求个数和请求的Host的连接数都是有限制的。毕竟我们可以在多个线程中同时执行多个同步请求,而且还有异步请求。

@get:Synchronized var maxRequests = 64
@get:Synchronized var maxRequestsPerHost = 5

那么在Dispatcher中是怎么维护这个请求状态的呢?
首先我们看下Dispatcher中有这么几个比较重要的双端队列ArrayDeque,分别是一个同步请求的队列和两个异步请求的队列,为什么异步请求有两个呢,其实,其中一个是正在执行的队列runningAsyncCalls,另一个是执行等待队列readyAsyncCalls

  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

Dispatcher中还有很多函数用于判断当前队列的数量情况

/** Returns a snapshot of the calls currently being executed. */
 //实际执行的同步和异步请求Call列表
  @Synchronized fun runningCalls(): List<Call{
    return Collections.unmodifiableList(runningSyncCalls + runningAsyncCalls.map { it.call })
  }
  // 异步请求等待队列的请求数
  @Synchronized fun queuedCallsCount(): Int = readyAsyncCalls.size
 // 获取正在执行的同步请求和异步请求数
  @Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size

那么继续看同步请求中,这个Dispatcher的executed和finish方法到底做了什么,先上代码:

/** Used by `Call#execute` to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    //注意这里操作的是同步请求队列
    runningSyncCalls.add(call)
  }

/** Used by `Call#execute` to signal completion. */
  internal fun finished(call: RealCall) {
    //注意这里操作的是同步请求队列
    finished(runningSyncCalls, call)
  }

private fun <Tfinished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

看到这里已经很明确了,同步请求 在Dispatcher中操作了同步请求队列runningSyncCalls 分别执行了简单的添加和移除操作。 到此我们同步请求也就分析完成了。

如果不出意外,异步请求时候也会去操作对应的等待和执行队列,这猜想是正确的,相比于同步请求异步请求会削微复杂一点。我们接下了这个章节分析下异步请求到底做了哪些事情,到底怎么调度的?

Dispatcher异步请求的调度

在分析之前,削微透露一下,其实异步请求是将异步请求从准备队列中移除然后加入到执行队列中,并使用一个线程池去执行请求也就是AsyncCall
为什么线程池能够执行AsyncCall,我想我也不用多说了,其实它就是Runnable。

internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable 

说了这么多,我们看下enqueue的函数到底是怎么样的呢?

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这个函数好像比同步请求并没复杂,就是加入到了异步请求队列而已吧。 是的!
这不还有一个重要的函数promoteAndExecute()这里面执行了异步请求的重要逻辑。

  /**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

前面的一堆逻辑就干了这些内容。判断正在执行的请求是不是超出了最大请求或最大Host数,没有的话就把等待队列中的AsyncCall对象移除,加入到执行队列中,然后呢就从执行队列中拿到线程池中执行。

asyncCall.executeOn(executorService)

  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

这里这个ThreadPoolExecutor很有意思, 我们看第一个参数,核心线程数是0,为啥呢?我们知道核心线程是不会被销毁的,这里设置为0,是想让线程池在不用的时候将线程销毁掉。

那这个线程执行了啥? 当然是asyncCall的run方法。那我们就看看这个RealCall.kt中的AsyncCallrun()方法干了什么?

override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    timeout.enter()
    try {
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
      } else {
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        canceledException.addSuppressed(t)
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      client.dispatcher.finished(this)
    }
  }
}

不出所料,这里执行了val response = getResponseWithInterceptorChain()
拦截器执行真正的网络请求,同时对返回结果进行了回调处理。而且我们注意到和同步请求一样这里还是调用了这个client.dispatcher.finished(this)

  /** Used by `AsyncCall#run` to signal completion. */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

这个finish是参数重载的方法,和同步请求的finish是不同的。 但是执行内容我们可以预见是一致的。

好了到此我们同步请求和异步请求都分析完了。

总结

Dispatcher是Okhttp中很重要的调度单元,根据我们定义的请求限制,完成了对同步和异步请求的调度。主要做的工作是操作了三个队列,有等待队列和执行队列,并在合适的时机调用对应的getResponseWithInterceptorChain()执行请求。

那么getResponseWithInterceptorChain()是什么?它是怎么完成请求的?又有哪些巧妙的地方,为什么这个函数承载了整个Okhttp的设计精髓?我们将在下一篇中继续进行分析。

如果对你削微有点帮助,给个赞是对我最大的鼓励。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352

推荐阅读更多精彩内容