线程池原理详解

线程池的使用

背景: 为什么使用线程池?

        并发的线程数量很多,并且每个执行时间都很短,这样频繁创建线程和销毁线程需要时间。所以java通过线程池达到这种效果。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

##1. 线程池构造

​        线程池就是管理线程的池子,避免增加线程和销毁线程的资源损耗。同时线程可以被重复利用。

###1.1 Executor接口

​        **Executor**是最基础的执行接口只提供一个方法,主要执行已经提交Runnable任务对象,可以理解将任务提交和任务执行解耦的方法。

```java

void execute(Runnable command);

```

​        **ExecutorService**接口是继承Executor并对其进行扩展。ExecutorService接口主要扩展如下接口方法:

```java

void shutdown(); //启动一次有序的关闭,之前提交的任务执行,但不接受新任务

boolean isShutdown();

Future<?> submit(Runnable task); //提交一个可执行的任务,返回一个Future代表这个任务

<T> Future<T> submit(Runnable task, T result);//提交一个可以执行的任务,返回一个Future代表这个任务.任务执行完成future.get()返回给定的result

boolean isTerminated();  //如果所有任务都已经被终止,返回true

```

​      **TheadPoolExecutor**类继承AbstractExecutorService,是线程池进行提交任务、停止等操作的具体实现; 

### 1.2  Executors类

​        Executors可以构造多种类型的线程池newFixedThreadPool()、newCachedThreadPool()、newScheduledThreadPool(int corePoolSize),但最终都是调用下面的构造函数:

```java

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit, 

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory, 

                          RejectedExecutionHandler handler)

```

**corePoolSize**:核心线程数,线程池初始化就有这么多线程

**maximumPoolSize**:最大线程数大小

**keepAliveTime**:非核心线程空闲时存活时间

**unit**: 线程空闲存活时间单位

**workQueue**:存放任务的阻塞队列

**threadFactory**:创建线程的工厂

**handler**:线程池饱和策略事件

### 1.3 **四种拒绝策略**

(1)AbortPolicy(抛出一个异常,默认的)

(2)DiscardPolicy(直接丢弃任务)

(3)DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)

(4)CallerRunsPolicy(交给线程池调用所在的线程进行处理)

###1.4.线程池中队列排队的策略

(1)不排队,直接提交

​      将任务直接交给线程处理

(2)有界队列

​      可以使用ArrayBlockingQueue(基于数组结构的有界队列,FIFO),并指定队列的最大长度

(3)无界队列

​        可以使用LinkedBlockingQueue(基于链表的有界队列,FIFO),理论上是该队列可以对无限多的任务排队

##2. 线程池执行过程:

​        首先有请求过来提交任务,核心线程池看是否已满,没有满则使用核心线程去处理提交的任务;如果核心线程数已满,新提交的任务则会被放在任务队列中排队等待;当核心线程数已满并且任务队列也满了,判断线程数是否达到maximumPoolSize,没有达到则会激活最大线程数,进行处理提交任务;如果最大线程数也满了,新来的任务则直接使用拒绝策略处理。

###2.1 线程池状态

​    在ThreadPoolExecutor实现类中首先会看到开头定义了ctl原子类,构造函数中调用ctlOf()主要维护了线程池状态(高3位)和线程池数量(低29位)

```java

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN  =  0 << COUNT_BITS;

private static final int STOP      =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl

private static int runStateOf(int c)    { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

```

runStateOf(int c)  用于获取高3位线程池状态

workerCountOf(int c) 用于获取低29位线程数量

ctlOf(int rs, int wc) 将高3位和低29位合并

###2.2  execute()任务提交

1. 获取线程数如果小于核心线程数,使用新的线程去处理任务,创建成功则直接返回

2. 如果创建worker线程失败,判断线程池是否是running状态并将任务加入到阻塞队列中。如果成功则进行double-check,主要判断刚加入的workQueue阻塞队列是否被执行。

  如果已经不是running状态,则直接删除任务,拒绝添加新的任务

  如果线程池处于running状态,如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null

3. 如果线程池不是running状态或者无法入队列,直接拒绝当前任务

```java

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

```

###2.3 addWork(Runnable firstTask, boolean core)  - 添加worker线程

firstTask:是worker线程的初始任务;

core: true是corePoolSize做为上限,false为maximumPoolSize做为上限。

1. 判断线程池状态,如果线程池大于shutdown状态并且任务为空并队列不为空,直接返回

2. 判断线程池数量,如果超过核心线程数或者最大线程数则直接返回,如果没超过则线程数加1,直接跳出循环。如果加1失败,再次获取线程数继续执行循环。

3. 循环结束后,继续往下走,ReentrantLock保证线程安全,接下来主要添加新的work实例,完成后解锁并启动work线程,成功后返回true,失败后则调用addWorkerFailed()

```java

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

              firstTask == null &&

              ! workQueue.isEmpty()))

            return false;

        for (;;) {

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

```

###2.3内部类worker

​      worker类继承AbstractQueuedSynchronizer达到线程安全效果,实现了Runnable类的run()方法。

​      构造方法Worker(Runnable firstTask)根据当前worker创建一个线程对象,然后firstTask调用run()方法执行业务任务

```java

private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

{

    /**

    * This class will never be serialized, but we provide a

    * serialVersionUID to suppress a javac warning.

    */

    private static final long serialVersionUID = 6138294804551838833L;


    final Thread thread;

    /** Initial task to run.  Possibly null. */

    Runnable firstTask;

    /** Per-thread task counter */

    volatile long completedTasks;


    Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

    }

    /** Delegates main run loop to outer runWorker  */

    public void run() {

        runWorker(this);

    }


    protected boolean isHeldExclusively() {

        return getState() != 0;

    }

    protected boolean tryAcquire(int unused) {

        if (compareAndSetState(0, 1)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }

    protected boolean tryRelease(int unused) {

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

    }

    public void lock()        { acquire(1); }

    public boolean tryLock()  { return tryAcquire(1); }

    public void unlock()      { release(1); }

    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {

        Thread t;

        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

            try {

                t.interrupt();

            } catch (SecurityException ignore) {

            }

        }

    }

}

```

###2.4 runWorker 执行任务

​    Work线程启动后,调用内部类worker的runWorker(this)

1. worker.unlock(),将AQS的state置为0,允许中断当前worker线程

2. 执行任务前加锁,防止在执行任务的时候被线程池的一些中断操作中断

3. 加锁后如果线程池停止或者线程中断,则任务中断

4. 执行beforeExecute()、task.run()、afterExecute()完成线程执行,如果异常后则抛异常,执行processWorkerExit()处理worker退出的流程

5. getTask()从阻塞队列中获取新的任务,当队列没有任务获取任务超时则当前woek退出流程

```java

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

        while (task != null || (task = getTask()) != null) {

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted.  This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interrupt

            if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly = false;

    } finally {

        processWorkerExit(w, completedAbruptly);

    }

}

```

###2.5 getTask()获取任务

1. 判断线程池状态为SHUTDOWN并且stop或者workQueue为空的时候,直接返回null,workerCount减一

2. 获取线程池数量,如果超过maximumPoolSize或者timedOut 和 timed 同时为true并且线程数大于1或线程池队列为空,线程池数量加1

3. 如果满足获取任务条件,根据是否需要定时获取调用不同方法:

      workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null

      workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务

```java

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

        try {

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

```

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,820评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,648评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,324评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,714评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,724评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,328评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,897评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,804评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,345评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,431评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,561评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,238评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,928评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,417评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,528评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,983评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,573评论 2 359

推荐阅读更多精彩内容