Java线程池

为什么需要线程池

对象复用思想在编程中有很多应用,不论是线程池还是连接池都是一种对象复用的思想。今天来谈谈Java里面的线程池。
Java中创建和销毁一个线程是比较昂贵的操作,需要系统调用。频繁创建和销毁线程会影响系统性能。于是线程池应运而生。

Executor框架

Executor框架是java中的线程池实现。Executor是最顶层的接口定义,它的子类和实现主要包括ExecutorService,ScheduledExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool等。其结构如下图所示:

ScheduledThreadPoolExecutor

Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后我们拿到一个Future,它代表一个异步任务执行的结果。关于shutdown和shutdownNow方法我们需要注意的是:这两个方法是非阻塞的,调用后立即返回,不会等待线程池关闭完成。如果我们需要等待线程池处理完成再返回可以使用ExecutorService#awaitTermination来完成。
shutdown方法会等待线程池中已经运行的任何和阻塞队列中等待执行的任务执行完成,而shutdownNow则不会,shutdownNow方法会尝试中断线程池中已经运行的任务,阻塞队列中等待的任务不会再被执行,阻塞队列中等待执行的任务会作为返回值返回。
ThreadPoolExecutor:是线程池中最核心的类,这里着重说一下这个类的各个构造参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  1. corePoolSize:线程池的核心线程数目,当一个请求进来时如果当前线程池中线程数量小于这个值,则直接通过ThreadFactory新建一个线程来处理这个请求,如果已有线程数量大于等于这个值则将请求放入阻塞队列中。
  2. maximumPoolSize:线程池的最大线程数目,当线程池数量已经等于corePoolSize并且阻塞队列也已经满了,则看线程数量是否小于maximumPoolSize:如果小于则创建一个线程来处理请求,否则使用“饱和策略”来拒绝这个请求。对于大于corePoolSize部分的线程,称作这部分线程为“idle threads”,这部分线程会有一个最大空闲时间,如果超过这个空闲时间还没有任务进来则将这些空闲线程回收。
  3. keepAliveTime和unit:这两个参数主要用来控制idle threads的最大空闲时间,超过这个空闲时间空闲线程将被回收。这里有一点需要注意,ThreadPoolExecutor中有一个属性:private volatile boolean allowCoreThreadTimeOut;,这个用来指定是否允许核心线程空闲超时回收,默认为false,即不允许核心线程超时回收,核心线程将一直等待新任务。如果设置这个参数为true,核心线程空闲超时后也可以被回收。
  4. workQueue:阻塞队列,超过corePoolSize部分的请求放入这个阻塞队列中等待执行。阻塞队列分为有界阻塞队列和无界阻塞队列。在创建阻塞队列时如果我们指定了这个队列的“capacity”则这个队列就是有界的,否则是无界的。这里有一点需要注意:使用线程池之前请明确是否真的需要无界阻塞队列,如果阻塞队列是无界的,会导致大量的请求堆积,进而造成内存溢出系统崩溃。
  5. threadFactory:是一个线程池工厂,主要用来为线程池创建线程,我们可以定制一个ThreadFactory来达到统一命名我们线程池中的线程的目的。
  6. handler:饱和策略,用来拒绝多余的请求。饱和策略有:CallerRunsPolicy:请求脱离线程池运行(调用者caller线程来运行这个任务);AbortPolicy:抛出RejectedExecutionException异常;DiscardPolicy:丢弃这个任务,即什么也不做;DiscardOldestPolicy:将阻塞队列中等待时间最久的任务删除(即队列头部的任务),将新的任务加入队尾。
threadpoolexecutor

ScheduledThreadPoolExecutor:ThreadPoolExecutor子类,它在ThreadPoolExecutor基础上加入了任务定时执行的功能。

Executors

Executors是一个工厂类,主要用来创建ExecutorService,ScheduledExecutorService等线程池。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

这个静态方法用来创建固定线程数目的线程池,可以看到其设置的corePoolSize和maximumPoolSize都是nThreads,其设定的阻塞队列是无界的,也就说多余的请求将一直积压在队列中进行等待,<font color="#FF0000">有可能造成内存溢出</font>。这也是阿里编码规范不推荐用Executors来创建ThreadPoolExecutor的原因。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

这个方法用来创建一个“工作窃取(work stealing)”ForkJoinPool线程池,这里不做展开,后面会介绍ForkJoinPool。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

注意到这个方法创建的核心线程数是0,maximumPoolSize是一个最大值,空闲线程的最大空闲时间是一分钟,阻塞队列是一个SynchronousQueue。这个类其实挺有意思的,SynchronousQueue主要拥有数据交换而不是队列。当SynchronousQueue中有一个元素时,put请求会被阻塞住,直到有消费者从队列中拿走这个元素,同理如果队列为空,消费者也必须阻塞等待有生产者放入新的元素。

注意到这些静态方法可以方便我们创建线程池,但是你必须清楚每个方法创建的线程池的各个参数设置,就像newFixedThreadPool这个方法创建的线程池的阻塞队列是无界的。所以我更倾向推荐大家自己直接用ThreadPoolExecutor来定制自己的线程池。

ForkJoinPool

Doug Lea在JDK7中引入了Fork/Join框架,ForkJoinPool不同于ThreadPoolExecutor,它是一种基于"分治"思想的计算框架。java8的stream API中很多地方都有用到ForkJoinPool。ForkJoinPool中的工作线程会对自己的任务按照一定的粒度进行拆分,一个大任务拆分成多个子任务之后,子任务放入工作队列中等待执行。当一个线程的工作队列为空是可以从其他线程的工作队列中steal任务执行。这也是"work-stealing"的由来。

基本思想

  1. ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  2. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是"LIFO"方式,也就是说每次从队尾取出任务来执行。
  3. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  4. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  5. 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
FJ线程池结构

fork

fork将任务放入任务隶属的工作线程的工作队列中。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join

  1. 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  2. 查看任务的完成状态,如果已经完成,直接返回结果。
  3. 如果任务尚未完成,但处于自己的工作队列内,则完成它。
  4. 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
  5. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  6. 递归地执行第5步。

<div align="center">
<img src="{{ site.url }}/images/posts/java/flowchart-of-join.png" alt="join"/>
</div>

以上就是 fork() 和 join() 的原理,这可以解释 ForkJoinPool 在递归过程中的执行逻辑,但还有一个问题:
最初的任务是 push 到哪个线程的工作队列里的?这就涉及到 submit() 函数的实现方法了。

submit

其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

总结

  1. Java中线程是一个比较昂贵的对象,线程的频繁创建和销毁会影响性能,因此借助"对象复用"思想产生线程池。由线程池来管理我们的线程。
  2. 使用Executors创建线程池时要明确创建的阻塞队列是否有界,如果是无界队列则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。最好自己创建ThreadPoolExecutor。
  3. ForkJoinPool一般配合stream API来使用,它适合做一些CPU密集型任务而不是I/O密集型任务。
  4. 线程池不要申明为本地变量,如果每次请求都要创建和销毁线程池,那么线程池也就失去了它的意义,应该将线程池申明为全局的。

参考资料

A Java Fork/Join Framework

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容

  • 1.介绍 这篇文章将介绍一下java中的线程池-一开始我们先看一下java标准库中的实现,之后再看一下Google...
    大风过岗阅读 1,242评论 0 0
  • 一.Java中的ThreadPoolExecutor类 java.uitl.concurrent.ThreadPo...
    谁在烽烟彼岸阅读 643评论 0 0
  • Java线程池 [toc] 什么是线程池 线程池就是有N个子线程共同在运行的线程组合。 举个容易理解的例子:有个线...
    石家志远阅读 1,304评论 0 6
  • 线程池是什么? 线程池用于多线程处理中,它可以根据系统的情况,可以有效控制线程执行的数量,优化运行效果。线程池做的...
    懒癌正患者阅读 2,738评论 2 80
  • 我用力睁开沉重的眼皮,四周已是一片压抑的黑暗,闭上眼睛或许还要亮点。我瞎子摸灯一样,尽力伸展我灵敏的触觉和手臂,...
    无所谓春秋阅读 161评论 0 0