Java线程池

本文主要介绍java线程池相关的内容,包括线程池的几种常用方式

线程池存在的意义

先抛一个问题,Thread能直接新建并使用,为什么还要使用线程池?

如果看过我之前写的博客的话可以知道,一个线程从新建到死亡,总共有5个状态。创建、运行包括死亡都会消耗cpu等系统资源。如果子线程每次执行的任务量很小,但是数量很大时,你会发现,基本上所有的系统资源都消耗在线程管理上。同时线程之间的执行权竞争也会消耗一定的系统资源,这就导致了程序的执行效率降低。另外还有一点就是,随意新建的线程无法做到统一管理。线程池的存在就是为了解决上面提到的问题,以提高程序的执行效率。

Executor

Executor其实更准确的解释是执行器,定义了一个线程执行的规范,跟我们常说的线程池相差有点远。

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

这段话的意思是,Executor是用来执行被提交的Runnable任务的。这个接口提供了一种将任务提交与每个任务如何运行的机制解耦的方式,包括线程使用,调度等细节。通常Executor用来代替显示的创建线程。

上面是官方给的注释,那Executor这跟我们今天要说的线程有什么关系呢。

其实Executor是线程池的父类,最终的实现类有两个ThreadPoolExecutor以及ScheduledThreadPoolExecutor,其中ThreadPoolExecutor就是我们常说的线程池。

在ThreadPoolExecutor之上还有一个接口ExecutorService,定义了任务提交、执行等一些相关接口,继承结构图如下所示:

线程池继承结构图.jpeg

ExecutorService

相对于Executor,ExecutorService更多是功能的定义,提供了诸如

  1. <T> Future<T> submit(Callable<T> task);
  2. Future<?> submit(Runnable task);

的方法,可以让线程执行返回结果。

ThreadPoolExecutor

这个才是我们常用的“线程池”,主要来看一下构造函数。

ThreadPoolExecutor有4个构造函数,

public ThreadPoolExecutor(int corePoolSize,                             
                          int maximumPoolSize,                          
                          long keepAliveTime,                           
                          TimeUnit unit,                                
                          BlockingQueue<Runnable> workQueue) {          
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
         Executors.defaultThreadFactory(), defaultHandler);             
}

...

public ThreadPoolExecutor(int corePoolSize,                             
                          int maximumPoolSize,                          
                          long keepAliveTime,                           
                          TimeUnit unit,                                
                          BlockingQueue<Runnable> workQueue,            
                          ThreadFactory threadFactory) {                
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
         threadFactory, defaultHandler);                                
}                                                                       

...

public ThreadPoolExecutor(int corePoolSize,                             
                          int maximumPoolSize,                          
                          long keepAliveTime,                           
                          TimeUnit unit,                                
                          BlockingQueue<Runnable> workQueue,            
                          RejectedExecutionHandler handler) {           
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
         Executors.defaultThreadFactory(), handler);                    
} 

... 

public ThreadPoolExecutor(int corePoolSize,                            
                          int maximumPoolSize,                         
                          long keepAliveTime,                          
                          TimeUnit unit,                               
                          BlockingQueue<Runnable> workQueue,           
                          ThreadFactory threadFactory,                 
                          RejectedExecutionHandler handler) {          
    if (corePoolSize < 0 ||                                            
        maximumPoolSize <= 0 ||                                        
        maximumPoolSize < corePoolSize ||                              
        keepAliveTime < 0)                                             
        throw new IllegalArgumentException();                          
    if (workQueue == null || threadFactory == null || handler == null) 
        throw new NullPointerException();                              
    this.acc = System.getSecurityManager() == null ?                   
            null :                                                     
            AccessController.getContext();                             
    this.corePoolSize = corePoolSize;                                  
    this.maximumPoolSize = maximumPoolSize;                            
    this.workQueue = workQueue;                                        
    this.keepAliveTime = unit.toNanos(keepAliveTime);                  
    this.threadFactory = threadFactory;                                
    this.handler = handler;                                            
}                                                                                                                                                                                                                   

最后都是调用到了参数最长的那个,我们来看一下里面个各个参数的意思:

  • corePoolSize : 核心线程的数量
  • maximumPoolSize : 最大线程数量,包括核心线程和非核心线程
  • keepAliveTime : 非核心线程空闲时存活的时间
  • unit : 存活时间的时间单位
  • workQueue : 任务队列
  • threadFactory : 创建线程的工厂类
  • handler : 当任务无法被处理的对象

线程数量

关于corePoolSize、maximumPoolSize和workQueue这里要单独拿出来讲一下。

首先要明确两个概念:核心线程非核心线程

核心线程是指线程空闲时也保存的线程池中的线程,最大数量就是corePoolSize个,当然如果设置了allowCoreThreadTimeOut,那么核心线程也会被回收。

非核心线程是指核心线程不够处理任务,同时线程池稍微到达maximumPoolSize时添加的线程,这些线程会在空闲keepAliveTime后被回收。

但是保存在线程池中的线程并没有真正的核心线程和非核心线程的区别,只是开始回收线程时,线程数量达到核心线程的数量便不再回收。

知道上面这个两个我们就可以接着往下讲了。

什么时候新建核心线程

当线程池的线程数量少于corePoolSize时,不管是否有核心线程空闲,只要来新任务,都是直接新建核心线程用来出来任务。

什么时候新建非核心线程

这个问题比较复杂一点,因为涉及到workQueue的类型以及容量问题,直接一点的的回答就是:

  • 当workQueue被塞满时,这时来新的任务才会新建非核心线程进行处理,如果线程池中的线程数量达到了maximumPoolSize,这时再来新任务将被拒绝,同时抛出一个RejectedExecutionException。

  • 但是存在一个特殊的情况,那就是设置的核心线程数量为0,这个时候会新建一个非核心线程用于处理任务

什么时候将任务放入workQueue

当线程池中的核心线程达到了corePoolSize,同时没有核心线程空闲,这时来新的任务都会被放入workQueue,当workQueue被塞满时就会尝试新建非核心线程。

所以我们可以看到,一个线程池中线程池中能存在的最大线程个数就是maximumPoolSize,但是能保存的最大任务数就会比这个大一点,是maximumPoolSize + workQueue.size() 之和。

大致的流程下图所示:

线程池任务分配.png

下面是demo验证一下正常情况:

public class ExecutorTest {

    public static void main(String[] args) {

try {

            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

            for (int i = 0; i < 10; i++) {

                final int index = i;

                executor.execute(() -> {
                    try {
                        Thread.sleep(200);
                        System.out.println("runnable index is " + index
                                + " and time is " + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }


            //workers 在线程池中用保存有新建完的线程,由于不存在直接获取的方法,这里用反射获取
            Class clazz = executor.getClass();
            Field workers = clazz.getDeclaredField("workers");
            workers.setAccessible(true);

            while (true) {

                HashSet hashSet = (HashSet) workers.get(executor);
                System.out.println("Queue size is " + executor.getQueue().size()
                        + " and thread count is " + hashSet.size());


                Thread.sleep(1000);
            }
        } catch (Exception ignore) {

        }
    }
}

打印如下:

time is 1525846211057 and thread count is 1
runnable index is 0 and time is 1525846211260
runnable index is 1 and time is 1525846211464
runnable index is 2 and time is 1525846211665
runnable index is 3 and time is 1525846211870
time is 1525846212062 and thread count is 1
runnable index is 4 and time is 1525846212074
time is 1525846213064 and thread count is 1
time is 1525846214069 and thread count is 1
time is 1525846215070 and thread count is 1
time is 1525846216074 and thread count is 1
time is 1525846217076 and thread count is 1
time is 1525846218079 and thread count is 1
time is 1525846219084 and thread count is 1
time is 1525846220089 and thread count is 1
time is 1525846221092 and thread count is 1

由于没有提供直接获取线程池中当前线程数量方法,用反射的方法来获取。

可以看到,就算非核心线程数量设置到了Integer.MAX_VALUE,但是任务队列并没有满,还是只有1个核心线程在执行任务。并且过了超时时间,线程并没有被销毁。

再来一个demo验证一下核心线程设置为0的情况:

public class ExecutorTest {

    public static void main(String[] args) {

try {

            ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

            for (int i = 0; i < 10; i++) {

                final int index = i;

                executor.execute(() -> {
                    try {
                        Thread.sleep(200);
                        System.out.println("runnable index is " + index
                                + " and time is " + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }


            //workers 在线程池中用保存有新建完的线程,由于不存在直接获取的方法,这里用反射获取
            Class clazz = executor.getClass();
            Field workers = clazz.getDeclaredField("workers");
            workers.setAccessible(true);

            while (true) {

                HashSet hashSet = (HashSet) workers.get(executor);
                System.out.println("Queue size is " + executor.getQueue().size()
                        + " and thread count is " + hashSet.size());


                Thread.sleep(1000);
            }
        } catch (Exception ignore) {

        }
    }
}

打印如下:

time is 1525846316180 and thread count is 1
runnable index is 0 and time is 1525846316383
runnable index is 1 and time is 1525846316589
runnable index is 2 and time is 1525846316793
runnable index is 3 and time is 1525846316999
time is 1525846317184 and thread count is 1
runnable index is 4 and time is 1525846317201
time is 1525846318188 and thread count is 1
time is 1525846319193 and thread count is 1
time is 1525846320197 and thread count is 1
time is 1525846321198 and thread count is 1
time is 1525846322202 and thread count is 1
time is 1525846323206 and thread count is 0
time is 1525846324206 and thread count is 0
time is 1525846325211 and thread count is 0

当核心线程设置为0时,提交任务给线程池,会新建一个非核心线程用于处理任务,过了超时时间后线程就被回收了。

Executors提供的4种默认线程池

Executors是一个线程池的工具类,最主要的用处就是给我们提供了4种常用的线程池的创建方式。

newFixedThreadPool(int nThreads)

这个静态返回一个固定线程数量的线程池,来看一下里面的实现。

public static ExecutorService newFixedThreadPool(int nThreads) {          
    return new ThreadPoolExecutor(nThreads, nThreads,                     
                                  0L, TimeUnit.MILLISECONDS,              
                                  new LinkedBlockingQueue<Runnable>());   
}                                                                         

这里就开始用到文章上面所解释的参数的意义,这里可以看到,核心线程和最大线程数量都为指定的nThreads,非核心线程超市时间为0,也就是默认值,任务队列为一个LinkedBlockingQueue。

解释一下为什么这是一个固定线程的线程池,结合上面的任务分配图可以很清楚的知道,

  1. 当新任务来时,最开始肯定是新建核心线程进行任务直接处理
  2. 当核心线程满了之后就是任务入队了,但是LinkedBlockingQueue理论上是一个无限大的队列,可以一直存放任务,这个线程池中不会存在非核心线程,线程的数量也就固定了。

newSingleThreadExecutor()

这个就是新建一个只有一个核心线程的线程池。

public static ExecutorService newSingleThreadExecutor() {              
    return new FinalizableDelegatedExecutorService                     
        (new ThreadPoolExecutor(1, 1,                                  
                                0L, TimeUnit.MILLISECONDS,             
                                new LinkedBlockingQueue<Runnable>())); 
}                                                                      

这个就不多解释了,看上面就知道了,只是把核心线程数指定为了1。

newCachedThreadPool()

这就是不限制线程的数量,但是线程会被回收。

public static ExecutorService newCachedThreadPool() {                
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,              
                                  60L, TimeUnit.SECONDS,             
                                  new SynchronousQueue<Runnable>()); 
}                                                                    

还是套用上面的任务分配图:

  1. 核心线程数量为0,以为这所有的任务都先进任务队列
  2. 但是SynchronousQueue是一个同步的队列,也就是SynchronousQueue中不会存在任务,只要有新任务都是直接分配出去处理。
  3. 最大线程数量为Integer.MAX_VALUE,意味着可以处理的任务是无限量的。
  4. 看完上面几点可以知道,当有任务来时都是交给非核心线程处理的,线程数量不够时直接新建非核心线程。所有线程空闲超过60秒将会被回收,这也是Cached的含义。

newScheduledThreadPool(int corePoolSize)

这个线程池主要用于执行需要重复执行的任务上,可以指定任务循环执行的模式。

public static ScheduledExecutorService newScheduledThreadPool(          
        int corePoolSize, ThreadFactory threadFactory) {                
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}                                                                       
public ScheduledThreadPoolExecutor(int corePoolSize) {                  
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              
          new DelayedWorkQueue());                                      
}                                                                       

可以看到最后其实是新建了一个ThreadPoolExecutor,设置的核心线程数为corePoolSize,队列为一个无限大小的延时队列。

三个方法说明一下:

  1. schedule(Runnable command,long delay,TimeUnit unit):就是一个只执行一次的延时任务,这个方法接受3个参数

    • 执行的任务
    • 延时的时间
    • 时间单位
  2. scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 这是一个无限次执行的延时任务,这个方法接受4个参数

    • 执行的任务
    • 第一次开始执行的延时
    • 以后每次开始的延时
    • 时间单位
  3. scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):这是一个无限次执行的延时任务,只不过每次新任务总在上一个任务执行完毕开始,这方法接受4个参数

    • 执行的任务
    • 第一次开始执行的延时
    • 上个任务结束后,开始下个任务的延时
    • 时间单位

总结

这篇基本上算都是干货了,总结没啥好写的,再看一遍文章,能记住的都记住吧,特别是文中提到的几个问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容