线程池与Glide网络请求并发调度

Executors类提供了4种不同的线程池:newCachedThreadPool、newFixedThreadPool、 newScheduledThreadPool和newSingleThreadExecutor,它们都是直接或间接通过ThreadPoolExecutor实现的。
*ThreadPoolExecutor:

    // Public constructors and methods

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)

ThreadPoolExecutor的构造方法有以下几个重要参数:
corePoolSize:核心线程数。核心线程会在线程池中一直存活,即时它们处于闲置状态,例外情况是allowCoreThreadTimeOut被设置为true。
maximumPoolSize:最大线程数。
keepAliveTime:线程闲置时的超时时长,超时后线程会被回收。
unit:keepAliveTime的时间单位。
workQueue:存放等待执行的任务的阻塞队列。
队列与线程池按照以下规则进行交互:
如果运行的线程数小于核心线程数(corePoolSize),则首选添加新线程而不排队。如果运行的线程数等于或者大于核心线程数(corePoolSize),则首选将请求加入队列而不添加新线程。如果请求无法加入队列,则创建新线程;如果这将导致超出最大线程数(maximumPoolSize),则任务将被拒绝执行。
*Executors:

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到FixedThreadPool只有固定数量的核心线程,任务队列是基于链表的无界阻塞队列。当所有线程都在运行时,新任务都会放到任务队列中等待。
默认情况下,Glide的网络请求是在EngineJob中的sourceExecutor中执行的,而这个sourceExecutor是通过GlideExecutor的newSourceExecutor方法实例化的。
*GlideExecutor:

/**
   * Returns a new fixed thread pool with the default thread count returned from
   * {@link #calculateBestThreadCount()}, the {@link #DEFAULT_SOURCE_EXECUTOR_NAME} thread name
   * prefix, and the
   * {@link com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy#DEFAULT}
   * uncaught throwable strategy.
   *
   * <p>Source executors allow network operations on their threads.
   */
  public static GlideExecutor newSourceExecutor() {
    return newSourceExecutor(calculateBestThreadCount(), DEFAULT_SOURCE_EXECUTOR_NAME,
        UncaughtThrowableStrategy.DEFAULT);
  }

  /**
   * Returns a new fixed thread pool with the given thread count, thread name prefix,
   * and {@link com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy}.
   *
   * <p>Source executors allow network operations on their threads.
   *
   * @param threadCount The number of threads.
   * @param name The prefix for each thread name.
   * @param uncaughtThrowableStrategy The {@link
   * com.bumptech.glide.load.engine.executor.GlideExecutor.UncaughtThrowableStrategy} to use to
   *                                  handle uncaught exceptions.
   */
  public static GlideExecutor newSourceExecutor(int threadCount, String name,
      UncaughtThrowableStrategy uncaughtThrowableStrategy) {
    return new GlideExecutor(threadCount, name, uncaughtThrowableStrategy,
        false /*preventNetworkOperations*/, false /*executeSynchronously*/);
  }

  // Visible for testing.
  GlideExecutor(int poolSize, String name,
      UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
      boolean executeSynchronously) {
    this(
        poolSize /* corePoolSize */,
        poolSize /* maximumPoolSize */,
        0 /* keepAliveTimeInMs */,
        name,
        uncaughtThrowableStrategy,
        preventNetworkOperations,
        executeSynchronously);
  }

  GlideExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTimeInMs, String name,
      UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
      boolean executeSynchronously) {
    this(
        corePoolSize,
        maximumPoolSize,
        keepAliveTimeInMs,
        name,
        uncaughtThrowableStrategy,
        preventNetworkOperations,
        executeSynchronously,
        new PriorityBlockingQueue<Runnable>());
  }

  GlideExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTimeInMs, String name,
      UncaughtThrowableStrategy uncaughtThrowableStrategy, boolean preventNetworkOperations,
      boolean executeSynchronously, BlockingQueue<Runnable> queue) {
    super(
        corePoolSize,
        maximumPoolSize,
        keepAliveTimeInMs,
        TimeUnit.MILLISECONDS,
        queue,
        new DefaultThreadFactory(name, uncaughtThrowableStrategy, preventNetworkOperations));
    this.executeSynchronously = executeSynchronously;
  }

  /**
   * Determines the number of cores available on the device.
   *
   * <p>{@link Runtime#availableProcessors()} returns the number of awake cores, which may not
   * be the number of available cores depending on the device's current state. See
   * http://goo.gl/8H670N.
   */
  public static int calculateBestThreadCount() {
    // We override the current ThreadPolicy to allow disk reads.
    // This shouldn't actually do disk-IO and accesses a device file.
    // See: https://github.com/bumptech/glide/issues/1170
    ThreadPolicy originalPolicy = StrictMode.allowThreadDiskReads();
    File[] cpus = null;
    try {
      File cpuInfo = new File(CPU_LOCATION);
      final Pattern cpuNamePattern = Pattern.compile(CPU_NAME_REGEX);
      cpus = cpuInfo.listFiles(new FilenameFilter() {
        @Override
        public boolean accept(File file, String s) {
          return cpuNamePattern.matcher(s).matches();
        }
      });
    } catch (Throwable t) {
      if (Log.isLoggable(TAG, Log.ERROR)) {
        Log.e(TAG, "Failed to calculate accurate cpu count", t);
      }
    } finally {
      StrictMode.setThreadPolicy(originalPolicy);
    }

    int cpuCount = cpus != null ? cpus.length : 0;
    int availableProcessors = Math.max(1, Runtime.getRuntime().availableProcessors());
    return Math.min(MAXIMUM_AUTOMATIC_THREAD_COUNT, Math.max(availableProcessors, cpuCount));
  }

GlideExecutor的newSourceExecutor与Executors的newFixedThreadPool类似,都是固定大小的线程池,不过任务队列不同。线程池大小为calculateBestThreadCount,该值为设备可用核心数但最大不超过4。任务队列为PriorityBlockingQueue,一种基于优先级的无界阻塞队列,插入元素需要实现Comparable接口的compareTo方法来提供排序依据。
*DecodeJob:

  @Override
  public int compareTo(DecodeJob<?> other) {
    int result = getPriority() - other.getPriority();
    if (result == 0) {
      result = order - other.order;
    }
    return result;
  }

Glide的Runnable实现类是DecodeJob,它的compareTo方法的逻辑是:优先级(共IMMEDIATE/HIGH/NORMAL/LOW四种,依次降低,默认为NORMAL)高的优先,若优先级相同则顺序在前的优先(先进先出)。
*RequestBuilder:

  /**
   * Set the target the resource will be loaded into.
   *
   * @param target The target to load the resource into.
   * @return The given target.
   * @see RequestManager#clear(Target)
   */
  public <Y extends Target<TranscodeType>> Y into(@NonNull Y target) {
    Util.assertMainThread();
    Preconditions.checkNotNull(target);
    if (!isModelSet) {
      throw new IllegalArgumentException("You must call #load() before calling #into()");
    }

    Request previous = target.getRequest();

    if (previous != null) {
      requestManager.clear(target);
    }

    requestOptions.lock();
    Request request = buildRequest(target);
    target.setRequest(request);
    requestManager.track(target, request);

    return target;
  }

*HttpUrlFetcher:


  @Override
  public void loadData(Priority priority, DataCallback<? super InputStream> callback) {
    long startTime = LogTime.getLogTime();
    final InputStream result;
    try {
      result = loadDataWithRedirects(glideUrl.toURL(), 0 /*redirects*/, null /*lastUrl*/,
          glideUrl.getHeaders());
    } catch (IOException e) {
      if (Log.isLoggable(TAG, Log.DEBUG)) {
        Log.d(TAG, "Failed to load data for url", e);
      }
      callback.onLoadFailed(e);
      return;
    }

    if (Log.isLoggable(TAG, Log.VERBOSE)) {
      Log.v(TAG, "Finished http url fetcher fetch in " + LogTime.getElapsedMillis(startTime)
          + " ms and loaded " + result);
    }
    callback.onDataReady(result);
  }

  private InputStream loadDataWithRedirects(URL url, int redirects, URL lastUrl,
      Map<String, String> headers) throws IOException {
    if (redirects >= MAXIMUM_REDIRECTS) {
      throw new HttpException("Too many (> " + MAXIMUM_REDIRECTS + ") redirects!");
    } else {
      // Comparing the URLs using .equals performs additional network I/O and is generally broken.
      // See http://michaelscharf.blogspot.com/2006/11/javaneturlequals-and-hashcode-make.html.
      try {
        if (lastUrl != null && url.toURI().equals(lastUrl.toURI())) {
          throw new HttpException("In re-direct loop");

        }
      } catch (URISyntaxException e) {
        // Do nothing, this is best effort.
      }
    }

    urlConnection = connectionFactory.build(url);
    for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
      urlConnection.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
    }
    urlConnection.setConnectTimeout(timeout);
    urlConnection.setReadTimeout(timeout);
    urlConnection.setUseCaches(false);
    urlConnection.setDoInput(true);

    // Stop the urlConnection instance of HttpUrlConnection from following redirects so that
    // redirects will be handled by recursive calls to this method, loadDataWithRedirects.
    urlConnection.setInstanceFollowRedirects(false);

    // Connect explicitly to avoid errors in decoders if connection fails.
    urlConnection.connect();
    if (isCancelled) {
      return null;
    }
    final int statusCode = urlConnection.getResponseCode();
    if (statusCode / 100 == 2) {
      return getStreamForSuccessfulRequest(urlConnection);
    } else if (statusCode / 100 == 3) {
      String redirectUrlString = urlConnection.getHeaderField("Location");
      if (TextUtils.isEmpty(redirectUrlString)) {
        throw new HttpException("Received empty or null redirect url");
      }
      URL redirectUrl = new URL(url, redirectUrlString);
      return loadDataWithRedirects(redirectUrl, redirects + 1, url, headers);
    } else if (statusCode == -1) {
      throw new HttpException(statusCode);
    } else {
      throw new HttpException(urlConnection.getResponseMessage(), statusCode);
    }
  }

  @Override
  public void cancel() {
    // TODO: we should consider disconnecting the url connection here, but we can't do so
    // directly because cancel is often called on the main thread.
    isCancelled = true;
  }

Glide将加载请求和Target(ImageView)关联,开始某个ImageView的加载请求前会先将该ImageView关联的请求清除。此时在线程池中的关联的DecodeJob,正在进行的网络请求不会被中断,在等待队列里的也不会被直接从线程池移除,而是移除回调并设置取消标志位,让未开始的后续加载步骤的逻辑不会被执行。
当列表(ListView/RecyclerView)快速滚动时,同时执行的网络请求数量不会超过设备可用核心数,其余请求会放到队列中等待执行。虽然队列长度可能会一下增加到几十,但随着列表复用View,队列中的大部分请求都会被取消掉,之后执行时不会发起网络请求,并迅速让位于等待中的请求。也就是说,快速滚动过程的中间很多个列表项的请求都会被略过。这样的机制保证了不会过度消耗资源导致滑动卡顿。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容