并发编程(三)-线程池与Future

1. 线程池的实现原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地
从队列中取任务,这是一个典型的生产者—消费者模型。

image.png

要实现这样一个线程池,有几个问题需要考虑:

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有
    界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放入队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
    有3种做法:
      1. 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程
        池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
      1. 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制。
      1. 使用阻塞队列。(最优

很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡
眠/轮询带来的资源消耗和延迟。 ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的

2. 线程池的类继承体系

image.png

其中这两个是核心的类

  • ThreadPoolExector
    可以执行某个任务
  • ScheduledThreadPoolExecutor
    可以执行某个任务,还可以周期性地执行任务

3. ThreadPoolExecutor

3.1. 核心数据结构

public class ThreadPoolExecutor extends AbstractExecutorService {
   //... 
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
   // 存放任务的阻塞队列 
   private final BlockingQueue<Runnable> workQueue; 
   // 对线程池内部各种变量进行互斥访问控制 
   private final ReentrantLock mainLock = new ReentrantLock(); 
   // 线程集合 
   private final HashSet<Worker> workers = new HashSet<Worker>();
   //... 
}

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { 
   // ... 
   final Thread thread; 
   // Worker封装的线程 
   Runnable firstTask; 
   // Worker接收到的第1个任务 
   volatile long completedTasks; 
   // Worker执行完毕的任务个数
   // ... 
}

核心配置参数解释

ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    1. corePoolSize:在线程池中始终维护的线程个数。
    1. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
    1. keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
    1. blockingQueue:线程池所用的队列类型。
    1. threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory() 。
    1. RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。

3.3. 线程池的优雅关闭

线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任
务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是
瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。

image.png

正确关闭线程池的步骤

关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接
下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:

// executor.shutdownNow(); 
executor.shutdown(); 
try {
  boolean flag = true; 
  do {
      flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS); 
  } while (flag); 
 } catch (InterruptedException e) {
 // ...
 }

任务的提交过程分析

任务的执行过程分析

线程池的4种拒绝策略

默认策略
AbortPolicy

  • CallerRunsPolicy
    策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:
  • AbortPolicy
    策略2:线程池抛异常:
  • DiscardPolicy
    策略3:线程池直接丢掉任务,神不知鬼不觉:
  • DiscardOldestPolicy
    策略4:删除队列中最早的任务,将当前任务入队列:

4. Executors工具类

concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池。

线程池类型

# 固定大小线程池(newFixedThreadPool),核心线程数是固定的
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

# 单线程线程池(newSigleThradExecutor)
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

# 缓存线程池(newCachedThreadPool),根据任务量增加
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

# 定时、周期性线程池(newScheduledThreadPool),适合时间轮,定时任务触发
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

 public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
  • 固定大小线程池(newFixedThreadPool),核心线程数是固定的
  • 缓存线程池(newCachedThreadPool),根据任务量增加
  • 单线程线程池(newSigleThradExecutor)
  • 定时、周期性线程池(newScheduledThreadPool),适合时间轮,定时任务触发

最佳实践

不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。
在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。

5. ScheduledThreadPoolExecutor

  • 周期性单线程线程池(newSingleThreadScheduledExecutor)

6. CompletableFuture用法

7. ForkJoinPool

简介

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合
并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的
Map/Reduce,多个线程并行计算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。
假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余
4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。
利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而
实现任务计算的负载均衡。

核心数据结构

工作窃取队列

ForkJoinPool状态控制

Worker线程的阻塞-唤醒机制

任务的提交过程分析

工作窃取算法:任务的执行过程分析

ForkJoinTask的fork/join

ForkJoinPool的优雅关闭

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容