线程池


Java线程 一般采用new Thread().start();的方式开启一个新的线程池实例。但是这样的会无节制的创建线程、回收线程,造成频繁的GC。

线程池的由来就是为了解决此问题的。线程池旨在线程的复用,这就可以节约我们用以往的方式创建线程和销毁所消耗的时间,减少线程频繁调度的开销,从而节约系统资源,提高系统吞吐量。

在Java线程池概念中,ExecutorService,它是一个接口,其实如果要从真正意义上来说,它可以叫做线程池的服务,因为它提供了众多接口api来控制线程池中的线程,而真正意义上的线程池就是:ThreadPoolExecutor,它实现了ExecutorService接口,并封装了一系列的api使得它具有线程池的特性,其中包括工作队列、核心线程数、最大线程数等。

ThreadPoolExecutor

ThreadPoolExecutor参数

  • int corePoolSize ——线程池中核心线程的数量。
  • int maximumPoolSize ——线程池中最大线程数量。
  • long keepAliveTime——非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。
  • TimeUnit unit ——时间单位,有纳秒、微秒、毫秒、秒、分、时、天等。
  • BlockingQueue< Runnable > workQueue ——线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。
  • ThreadFactory threadFactory —— 线程工厂,为了给线程池提供创建线程的功能。
  • RejectedExecutionHandler handler——拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

任务队列

用于保存等待执行的任务的阻塞队列,可以选择以下几个:

  • ArrayBlockingQueue:基于数组的阻塞队列,按照FIFO原则进行排序。
  • LinkedBlockingQueue:基于链表的阻塞队列,按照FIFO原则对元素进行排序。
  • SynchronousQueue:一个不储存元素的阻塞队列,每一个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处于阻塞状态。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

任务拒绝策略

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

ThreadPoolExecutor工作规则

  • 如果线程池中的线程数未达到核心线程数,则会立马启用一个核心线程去执行。
  • 如果线程池中的线程数已经达到核心线程数,且workQueue未满,则将新线程放入workQueue中等待执行。
  • 如果线程池中的线程数已经达到核心线程数且workQueue已满,但未超过非核心线程数,则开启一个非核心线程来执行任务。
  • 如果线程池中的线程数已经超过非核心线程数,则拒绝执行该任务。

注:核心线程、非核心线程和线程队列,是三个概念哦,别混为一谈。

ThreadPoolExecutor封装案例

/**
 * 类描述:线程池管理器
 *
 * @author jinzifu
 * @Email jinzifu123@163.com
 * @date 2017/12/7 1150
 */

public class ThreadPoolManager {
    /**
     * 核心线程的数量。当前设备可用处理器核心数*2 + 1,能够让cpu的效率得到最大程度执行。
     */
    private int corePoolSize;
    /**
     * 最大线程数量,表示当缓冲队列满的时候能继续容纳的等待任务的数量。
     */
    private int maximumPoolSize;
    /**
     * 非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。
     * 如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长。
     */
    private long keepAliveTime = 1;
    /**
     * 时间单位,有纳秒、微秒、毫秒、秒、分、时、天等。
     */
    private TimeUnit timeUnit = TimeUnit.HOURS;
    private ThreadPoolExecutor threadPoolExecutor;
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

    private ThreadPoolManager() {
        init();
    }

    private static class SingleClass {
        private static final ThreadPoolManager THREAD_POOL_MANAGER = new ThreadPoolManager();
    }

    /**
     * 静态内部类单例模式
     *
     * @return
     */
    public static ThreadPoolManager getInstance() {
        return SingleClass.THREAD_POOL_MANAGER;
    }

    /**
     * 配置线程池属性
     * 部分参考AsyncTask的配置设计
     */
    private void init() {
        corePoolSize = CPU_COUNT + 1;
        maximumPoolSize = CPU_COUNT * 2 + 1;
        threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                timeUnit,
                new LinkedBlockingDeque<Runnable>(),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }

    /**
     * 执行任务
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        if (runnable != null) {
            threadPoolExecutor.execute(runnable);
        }
    }

    /**
     * 从线程池移除任务
     *
     * @param runnable
     */
    public void remove(Runnable runnable) {
        if (runnable != null) {
            threadPoolExecutor.remove(runnable);
        }
    }
}

FixedThreadPool

   /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

从方法注释和参数配置来看,核心线程数和最大线程数相同(意味着没有非核心线程,且线程池处于保活状态以等待任务,除非调用shutdown shutdown关闭线程队列中的任务),超过核心线程数的任务都要进入线程队列中等待,当核心线程处于闲置状态时就继续执行队列里的任务。线程池队列是没有参数,说明队列长度是默认的Integer.MAX_VALUE(2的31次方减1)。

特点:固定设置线程数,快速响应。

外部调用示例:

 ExecutorService executorService = Executors.newFixedThreadPool(3);
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 20; i++) {
                            try {
                                Thread.sleep(1000);
                                Log.d(TAG, "" + i);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });

SingleThreadExecutor

 /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

从方法注释上看,和FixedThreadPool几乎一样,只不过该线程池的固定线程个数是1。这样的好处是避免了线程间的同步问题。

CachedThreadPool

  /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

从方法注释里看,该线程池没有核心线程,所有非核心线程数就是最大线程数。由于最大线程数为无限大,所以每当我们添加一个新任务进来的时候,如果线程池中有空闲的线程,则由该空闲的线程执行新任务,如果没有空闲线程,则创建新线程来执行任务。根据CachedThreadPool的特点,我们可以在有大量任务请求的时候使用CachedThreadPool,因为当CachedThreadPool中没有新任务的时候,它里边所有的线程都会因为超时(60秒)而被终止。

这里它使用了SynchronousQueue作为线程队列。

特点:CachedTreadPool一个最大的优势是它可以根据程序的运行情况自动来调整线程池中的线程数量。

ScheduledThreadPool

   /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

从方法注释里可以看到,它的核心线程数量是固定的(我们在构造的时候传入的),但是非核心线程是无穷大,当非核心线程闲置时,则会被立即回收。

ScheduledExecutorService扩展了ExecutorService接口,提供时间排程的功能。

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * Creates and executes a one-shot action that becomes enabled
     * after the given delay.
     * 翻译:创建并执行在给定延迟后启用的一次性操作。
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                     long delay, TimeUnit unit);

    /**
     * Creates and executes a ScheduledFuture that becomes enabled after the
     * given delay.
     * 翻译:创建并执行在给定延迟后启用的操作。
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is, executions will commence after
     * 参数initialDelay:表示第一次延时的时间
     * 参数period:表示任务执行开始后距下一个任务开始执行的时间间隔。
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the
     * given delay between the termination of one execution and the
     * commencement of the next.
     * 参数initialDelay:表示第一次延时的时间
     * 参数delay:表示任务执行结束后距下一个任务开始执行的时间间隔
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

情景代码测试1executorService.scheduleWithFixedDelay(new MyRunnable(5), 1, 3, TimeUnit.SECONDS);

class MyRunnable implements Runnable {
        private int count;
        final int[] i = {0};

        public MyRunnable(int i) {
            this.count = i;
        }

        @Override
        public void run() {
            try {
                i[0] = i[0] + 1;
                if (i[0] < count) {
                    Log.d(TAG, "开始第" + i[0] + "次打印");
                    Thread.sleep(1000);
                    Log.d(TAG, "结束第" + i[0] + "次打印");
                } else {
                    Log.d(TAG, "停止延时执行...");
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

Log日志如下

12-07 14:24:41.556 9263-9263 D/ThreadFragment: 开始延时执行...
12-07 14:24:42.557 9263-9353 D/ThreadFragment: 开始第1次打印
12-07 14:24:43.559 9263-9353 D/ThreadFragment: 结束第1次打印
12-07 14:24:46.561 9263-9353 D/ThreadFragment: 开始第2次打印
12-07 14:24:47.562 9263-9353 D/ThreadFragment: 结束第2次打印
12-07 14:24:50.564 9263-9353 D/ThreadFragment: 开始第3次打印
12-07 14:24:51.567 9263-9353 D/ThreadFragment: 结束第3次打印
12-07 14:24:54.569 9263-9353 D/ThreadFragment: 开始第4次打印
12-07 14:24:55.570 9263-9353 D/ThreadFragment: 结束第4次打印
12-07 14:24:58.572 9263-9353 D/ThreadFragment: 停止延时执行...

从log日志可以看出:

  • 14:24:41到14:24:42是延迟1秒启动的。
  • 结束第1次打印时间与开始第2次打印时间是相隔3秒,验证了每一次执行终止和下一次执行开始之间都存在给定的延迟delay。

情景代码测试2executorService.scheduleAtFixedRate(new MyRunnable(5), 1, 3, TimeUnit.SECONDS);

Log日志如下(MyRunnable内代码一样)

12-07 14:23:21.371 8012-8012 D/ThreadFragment: 开始延时执行...
12-07 14:23:22.373 8012-8115 D/ThreadFragment: 开始第1次打印
12-07 14:23:23.374 8012-8115 D/ThreadFragment: 结束第1次打印
12-07 14:23:25.373 8012-8115 D/ThreadFragment: 开始第2次打印
12-07 14:23:26.375 8012-8115 D/ThreadFragment: 结束第2次打印
12-07 14:23:28.374 8012-8115 D/ThreadFragment: 开始第3次打印
12-07 14:23:29.375 8012-8115 D/ThreadFragment: 结束第3次打印
12-07 14:23:31.374 8012-8115 D/ThreadFragment: 开始第4次打印
12-07 14:23:32.376 8012-8115 D/ThreadFragment: 结束第4次打印
12-07 14:23:34.372 8012-8115 D/ThreadFragment: 停止延时执行...

从log日志可以看出:

  • 14:23:21到14:23:22是延迟1秒启动的。
  • 开始第1次打印时间与开始第2次打印时间是period = 3秒。

注意:通过ScheduledExecutorService执行的周期任务,如果任务执行过程中抛出了异常,那么过ScheduledExecutorService就会停止执行任务,且也不会再周期地执行该任务了。所以你如果想保住任务都一直被周期执行,那么catch一切可能的异常。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容