ThreadPoolExecutor

理解线程池的原理

一、线程池简介

作用:
线程池作用就是限制系统中执行线程的数量,重复利用已创建的线程来降低系统开销。

使用线程池的好处:
1、减少在创建和销毁线程上的资源消耗
2、限制系统中执行线程的数量,避免程序创建大量线程而导致系统性能下降甚至崩溃

二、JAVA实现

1、重点类ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,    //核心线程的数量
                          int maximumPoolSize,    //最大线程数量
                          long keepAliveTime,    //超出核心线程数量以外的线程空余存活时间
                          TimeUnit unit,    //存活时间的单位
                          BlockingQueue<Runnable> workQueue,    //保存待执行任务的队列
                          ThreadFactory threadFactory,    //创建新线程使用的工厂
                          RejectedExecutionHandler handler // 当任务无法执行时的处理器
                          ) 

//目前正在运行的工作线程,Worker实现Runnable接口,继承AbstractQueuedSynchronizer同步队列
private final HashSet<Worker> workers = new HashSet<>(); 

private final BlockingQueue<Runnable> workQueue; //待执行任务的队列

corePoolSize:核心线程池数量

在线程数少于核心数量时,有新任务进来就新建一个线程,即使有的线程没事干
等超出核心数量后,就不会新建线程了,空闲的线程就得去任务队列里取任务执行了

maximumPoolSize:最大线程数量

包括核心线程池数量 + 核心以外的数量
如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务

keepAliveTime:核心池以外的线程存活时间

如果给线程池设置 allowCoreThreadTimeOut(true),则核心线程在空闲时头上也会响起死亡的倒计时
如果任务是多而容易执行的,可以调大这个参数,那样线程就可以在存活的时间里有更大可能接受新任务

TimeUnit unit 存活时间的单位,枚举

workQueue:保存待执行任务的阻塞队列

线程池中使用的队列是 BlockingQueue 接口,常用的实现有如下几种:

ThreadPoolExecutor的三种队列SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue

ArrayBlockingQueue:基于数组、有界,按 FIFO(先进先出)原则对元素进行排序

LinkedBlockingQueue:基于链表,按FIFO (先进先出) 排序元素
吞吐量通常要高于 ArrayBlockingQueue
Executors.newFixedThreadPool() 使用了这个队列

SynchronousQueue:不存储元素的阻塞队列
每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
吞吐量通常要高于 LinkedBlockingQueue
Executors.newCachedThreadPool使用了这个队列

PriorityBlockingQueue:具有优先级的、无限阻塞队列

threadFactory:每个线程创建的工厂

可以给线程命名,设置个优先级

handler:饱和策略

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
也可以自己实现

2、主要流程

image.png

当currentSize<corePoolSize时,直接启动一个核心线程并执行任务。
当currentSize>=corePoolSize、并且workQueue未满时,添加进来的任务会被安排到workQueue中等待执行。
当workQueue已满,但是currentSize<maximumPoolSize时,会立即开启一个非核心线程来执行任务。
当currentSize>=corePoolSize、workQueue已满、并且currentSize>maximumPoolSize时,调用handler默认抛出RejectExecutionExpection异常。

//ThreadPoolExecutor.execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //1.当前池中线程比核心数少,新建一个线程执行任务
    if (workerCountOf(c) < corePoolSize) {   
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2.核心池已满,但任务队列未满,添加到队列中
    if (isRunning(c) && workQueue.offer(command)) {   
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))    //如果这时被关闭了,拒绝任务
            reject(command);
        else if (workerCountOf(recheck) == 0)    //如果之前的线程已被销毁完,新建一个线程
            addWorker(null, false);
    }
    //3.核心池已满,队列已满,试着创建一个新线程
    else if (!addWorker(command, false))
        reject(command);    //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}

Worker.run -> ThreadPoolExecutor.runWork -> ThreadPoolExecutor.getTask获取阻塞列队中的任务

//ThreadPoolExecutor.getTask
private Runnable getTask() {  
       boolean timedOut = false; // Did the last poll() time out?  
  
       retry:  
       for (;;) {  
           int c = ctl.get();  
           int rs = runStateOf(c);  
  
           // Check if queue empty only if necessary.  
           if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
        //如果线程池处于关闭之后或已关闭任务队列为空,则重置工作线程数  
               decrementWorkerCount();  
               return null;//返回null任务  
           }  
  
           boolean timed;      // Are workers subject to culling?  
  
           for (;;) {  
               int wc = workerCountOf(c);  
    //如果线程池正在运行,根据是否允许空闲线程等待任务和  
    //当前工作线程与核心线程池数量比较值,判断是否需要超时等待任务  
               timed = allowCoreThreadTimeOut || wc > corePoolSize;  
               if (wc <= maximumPoolSize && ! (timedOut && timed))  
        //如果当前工作线程数,小于最大线程数,空闲工作线程不需要超时等待任务,  
        //则跳出自旋,即在当前工作线程小于最大线程池的情况下,有工作线程可用,  
        //任务队列为空。  
                   break;  
               if (compareAndDecrementWorkerCount(c))  
        //减少工作线程数量失败,返回null  
                   return null;  
               c = ctl.get();  // Re-read ctl  
               if (runStateOf(c) != rs)  
        //如果与自旋前状态不一致,跳出本次自旋  
                   continue retry;  
               // else CAS failed due to workerCount change; retry inner loop  
           }  
  
           try {  
        //如果非超时则直接take,否则等待keepAliveTime时间,poll任务  
               Runnable r = timed ?  
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                   workQueue.take();  
               if (r != null)  
                   return r;  
               timedOut = true;  
           } catch (InterruptedException retry) {  
               timedOut = false;  
           }  
       }  
   }  

3、JAVA建议使用的线程池实现类

Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}  
public static ExecutorService newSingleThreadExecutor() {  
    return new FinalizableDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>()));  
}  
public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}  

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

另外阿里编码规约中强制要求线程池不允许使用 Executors 去创建,这边做下了解


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • 为什么使用线程池 当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题...
    闽越布衣阅读 4,292评论 10 45
  • 文章摘要:在业务系统中,线程池框架技术一直是用来解决多线程并发的一种有效方法。 在JDK中,J.U.C并发包下的T...
    癫狂侠阅读 2,091评论 2 21
  • 阿呆的项目经理给阿呆分配了一个统计点击量的问题。情景是这样的:每个广告位上的创意都可以点击,点击过后会经过服务器跳...
    等风的猪_阅读 709评论 0 3
  • 在工作之中使用git,除了常用的clone,add,commit,push,fetch,pull,merge等还会...
    haiyangjiajian阅读 244评论 0 1
  • 《山水间》 迷雾云间绕, 江中山影渺。 渔家爱意浓, 水上孤舟袅。
    黄尚彪阅读 199评论 0 1