(四)JDK并发包——线程池

为了避免系统频繁的创建和销毁线程,可以使用线程池来管理线程,以实现线程的复用。同时,线程池还可以帮助管理系统中的线程数量,防止过多的并发线程耗尽系统的资源。

Executor框架

Executor框架

ThreadPoolExecutor表示一个线程池,而Executors可以当作是一个线程池工厂类,用于方便的生产线程池,可以通过Executors的静态方法获得几种预先设定的线程池。从上图可知,ThreadPoolExecutor类实现了Executor接口,通过这个接口,线程池ThreadPoolExecutor可以接受并运行任何实现Runnable接口的对象。

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor() 
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
······

1.几种经典线程池

  • FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

通过Executors.FixedThreadPool(n)方法生成的固定线程数量为n的线程池,该线程池中的线程数量始终不变,当有一个新的任务提交时,如果线程中有空闲的线程,则立即执行该任务;如果没有空闲线程,则新的任务会缓存在一个任务队列中,等到线程池中有空闲线程了再执行。线程池使用的是无界队列LinkedBlockingQueue,任务可以无限的添加进队列中,但是也有耗尽系统资源的风险。

  • CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

通过Executors.newCachedThreadPool()方法生成的可根据实际情况调整线程数量的线程池。线程中的数量不固定,如果有空闲线程则优先使用空闲线程,如果所有线程都忙碌则会创建一个新的线程处理任务。线程组将corePoolSize设为0,而maximumPoolSize设为无穷大,也就是说平时线程组中没有线程,如果有任务提交而所有线程繁忙时,线程组会无限制的创建线程,同样可能会耗尽系统资源。线程组使用的SynchronousQueue容量为0,无法存储任务,只是提交任务的中介,因此所有任务都会立刻提交成功,不会被拒绝。

  • SingleThreadExecutor
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

通过Executors.newSingleThreadExecutor()方法生成线程数量永远为1的线程池。

  • SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

通过Executors.newSingleThreadScheduledExecutor()方法生成的线程数量永远为1的线程池。该线程池可以控制线程的启动时间,达到延时启动的效果。

  • ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

通过Executors.newThreadScheduledExecutor(n)方法生成的固定线程数量为n的线程池。该线程池也可以控制线程的启动时间。

2.ScheduledThreadPool的使用

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • schedule()方法会在给定时间后调用一次指定方法。
  • scheduleAtFixedRate()会周期性的调用指定方法,在initialDelay之后执行第一次方法,在上一次方法开始执行后period时间后再执行下一次方法。上一次任务的执行时长不会影响下一次任务开始执行的时间。
  • scheduleWithFixedDelay()会周期性的调用指定方法,在initialDelay之后执行第一次方法,在上一次方法结束执行后delay时间后再执行下一次方法。上一次任务的执行时长会影响下一次任务开始执行的时间。

3.创建自定义的ThreadPoolExecutor线程池

ThreadPoolExecutor的构造方法如下,

public ThreadPoolExecutor(
                              // 线程池中稳定线程数量
                              int corePoolSize,
                              // 线程池中最大线程数量
                              int maximumPoolSize,
                              // 线程超过稳定数量时,多余的线程的存活时间,
                              // 超时的线程将被回收
                              long keepAliveTime,
                              // 超时时间的单位
                              TimeUnit unit,
                              // 任务队列,被提交但尚未被执行的任务存放的地方
                              BlockingQueue<Runnable> workQueue,
                              // 线程工厂,用于创建线程
                              ThreadFactory threadFactory,
                              // 拒绝策略,当任务太多来不及处理时,如何拒绝任务
                              RejectedExecutionHandler handler)
任务队列(workQueue)
  • SynchronousQueue直接提交的队列:SynchronousQueue没有容量,每一个插入操作都要等待一个删除操作,也就是说SynchronousQueue只是任务提交的中介,提交的任务不会被真实的保存,而是直接提交给线程组执行,如果没有空闲线程则创建新线程,如果进程数量达到最大值则拒绝任务。因此使用SynchronousQueue总要配合很大的maximumPoolSize值以防止任务被拒绝。
  • ArrayBlockingQueue有界的任务队列:使用ArrayBlockingQueue时,如果线程池实际线程数 n 小于稳定数量corePoolSize,则会优先创建新的线程;如果 n 大于corePoolSize,则将任务存入队列中等待;如果队列存满了,而 n 小于最大线程数maximumPoolSize,就创建新线程处理任务;如果队列已满,而线程数也达到最大值,则拒绝任务。也就是说,除非系统非常繁忙,否则ArrayBlockingQueue会使线程数稳定在corePoolSize
  • LinkedBlockingQueue无界的任务队列:除非系统资源耗尽,否则任务可以一直加入队列中。当有新任务到来时,如果线程池实际线程数 n 小于稳定数量corePoolSize,则会创建新的线程;而如果 n 大于corePoolSize,则将任务存入队列中等待。也就是说,线程中的数量永远不会超过corePoolSize,而线程组的拒绝策略也永远不会触发。而如果队列中的任务数量持续增长,将会耗尽系统资源。
  • PriorityBlockingQueue优先任务队列:优先队列可以控制任务的执行顺序,按照任务的优先级执行任务。

4.线程组执行逻辑

ThreadPoolExecutor类的核心方法execute()的执行逻辑如下,源码的英文注解也描述了线程组是如何执行任务的:

  1. 如果线程组线程数少于稳定数corePoolSize,方法会直接尝试调用addWorker()方法创建一个新的线程来执行任务,此时会原子性的检查runStateworkerCount的值,如果不满足则会返回false来阻止线程组创建新线程。
  2. 如果线程数高于corePoolSize或创建线程失败,则方法会尝试将任务加入等待队列中,如果加入成功,则任务在队列中等待执行。
  3. 如果加入队列失败,则方法会尝试再次调用addWorker()方法提交线程,这次如果线程数小于maximumPoolSize则会为任务创建线程执行。而如果这次提交失败,则方法会根据拒绝策略拒绝任务。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

5.拒绝策略

JDK内置四种拒绝策略:

  • AbortPolicy:该策略会直接抛出异常,阻止系统正常工作。
  • CallerRunsPolicy:只要线程池未关闭,该策略会直接在调用者线程中运行被拒绝的任务。
  • DiscardOldestPolicy:该策略将丢弃最老的一个任务,也就是即将被执行的那一个任务,然后尝试再次提交当前任务。
  • DiscardOldestPolicy:该策略会直接丢弃当前这个被拒绝的任务。

6.线程工厂(ThreadFactory)

ThreadFactory是一个接口,用于描述如何生成线程。Executor中的线程池构造方法都有额外接受一个ThreadFactory参数的版本。建议使用自定义的ThreadFactory方法来为创建的线程起个名字,这是ThreadFactory最简单也是最有用的功能,免得在排查线程问题时面对报错信息无从下手。

7.最优的线程池线程数

线程池的大小对系统性能的影响并不大,因此大小并不需要过分精确,只需要避免极大和极小的极端情况。
线程池线程数 = CPU数量 * CPU使用率 * (1 + 等待时间/计算时间)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容