线程池原理详解

线程池的使用

背景: 为什么使用线程池?

        并发的线程数量很多,并且每个执行时间都很短,这样频繁创建线程和销毁线程需要时间。所以java通过线程池达到这种效果。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

##1. 线程池构造

​        线程池就是管理线程的池子,避免增加线程和销毁线程的资源损耗。同时线程可以被重复利用。

###1.1 Executor接口

​        **Executor**是最基础的执行接口只提供一个方法,主要执行已经提交Runnable任务对象,可以理解将任务提交和任务执行解耦的方法。

```java

void execute(Runnable command);

```

​        **ExecutorService**接口是继承Executor并对其进行扩展。ExecutorService接口主要扩展如下接口方法:

```java

void shutdown(); //启动一次有序的关闭,之前提交的任务执行,但不接受新任务

boolean isShutdown();

Future<?> submit(Runnable task); //提交一个可执行的任务,返回一个Future代表这个任务

<T> Future<T> submit(Runnable task, T result);//提交一个可以执行的任务,返回一个Future代表这个任务.任务执行完成future.get()返回给定的result

boolean isTerminated();  //如果所有任务都已经被终止,返回true

```

​      **TheadPoolExecutor**类继承AbstractExecutorService,是线程池进行提交任务、停止等操作的具体实现; 

### 1.2  Executors类

​        Executors可以构造多种类型的线程池newFixedThreadPool()、newCachedThreadPool()、newScheduledThreadPool(int corePoolSize),但最终都是调用下面的构造函数:

```java

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit, 

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory, 

                          RejectedExecutionHandler handler)

```

**corePoolSize**:核心线程数,线程池初始化就有这么多线程

**maximumPoolSize**:最大线程数大小

**keepAliveTime**:非核心线程空闲时存活时间

**unit**: 线程空闲存活时间单位

**workQueue**:存放任务的阻塞队列

**threadFactory**:创建线程的工厂

**handler**:线程池饱和策略事件

### 1.3 **四种拒绝策略**

(1)AbortPolicy(抛出一个异常,默认的)

(2)DiscardPolicy(直接丢弃任务)

(3)DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)

(4)CallerRunsPolicy(交给线程池调用所在的线程进行处理)

###1.4.线程池中队列排队的策略

(1)不排队,直接提交

​      将任务直接交给线程处理

(2)有界队列

​      可以使用ArrayBlockingQueue(基于数组结构的有界队列,FIFO),并指定队列的最大长度

(3)无界队列

​        可以使用LinkedBlockingQueue(基于链表的有界队列,FIFO),理论上是该队列可以对无限多的任务排队

##2. 线程池执行过程:

​        首先有请求过来提交任务,核心线程池看是否已满,没有满则使用核心线程去处理提交的任务;如果核心线程数已满,新提交的任务则会被放在任务队列中排队等待;当核心线程数已满并且任务队列也满了,判断线程数是否达到maximumPoolSize,没有达到则会激活最大线程数,进行处理提交任务;如果最大线程数也满了,新来的任务则直接使用拒绝策略处理。

###2.1 线程池状态

​    在ThreadPoolExecutor实现类中首先会看到开头定义了ctl原子类,构造函数中调用ctlOf()主要维护了线程池状态(高3位)和线程池数量(低29位)

```java

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN  =  0 << COUNT_BITS;

private static final int STOP      =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl

private static int runStateOf(int c)    { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

```

runStateOf(int c)  用于获取高3位线程池状态

workerCountOf(int c) 用于获取低29位线程数量

ctlOf(int rs, int wc) 将高3位和低29位合并

###2.2  execute()任务提交

1. 获取线程数如果小于核心线程数,使用新的线程去处理任务,创建成功则直接返回

2. 如果创建worker线程失败,判断线程池是否是running状态并将任务加入到阻塞队列中。如果成功则进行double-check,主要判断刚加入的workQueue阻塞队列是否被执行。

  如果已经不是running状态,则直接删除任务,拒绝添加新的任务

  如果线程池处于running状态,如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null

3. 如果线程池不是running状态或者无法入队列,直接拒绝当前任务

```java

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

```

###2.3 addWork(Runnable firstTask, boolean core)  - 添加worker线程

firstTask:是worker线程的初始任务;

core: true是corePoolSize做为上限,false为maximumPoolSize做为上限。

1. 判断线程池状态,如果线程池大于shutdown状态并且任务为空并队列不为空,直接返回

2. 判断线程池数量,如果超过核心线程数或者最大线程数则直接返回,如果没超过则线程数加1,直接跳出循环。如果加1失败,再次获取线程数继续执行循环。

3. 循环结束后,继续往下走,ReentrantLock保证线程安全,接下来主要添加新的work实例,完成后解锁并启动work线程,成功后返回true,失败后则调用addWorkerFailed()

```java

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

              firstTask == null &&

              ! workQueue.isEmpty()))

            return false;

        for (;;) {

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

```

###2.3内部类worker

​      worker类继承AbstractQueuedSynchronizer达到线程安全效果,实现了Runnable类的run()方法。

​      构造方法Worker(Runnable firstTask)根据当前worker创建一个线程对象,然后firstTask调用run()方法执行业务任务

```java

private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

{

    /**

    * This class will never be serialized, but we provide a

    * serialVersionUID to suppress a javac warning.

    */

    private static final long serialVersionUID = 6138294804551838833L;


    final Thread thread;

    /** Initial task to run.  Possibly null. */

    Runnable firstTask;

    /** Per-thread task counter */

    volatile long completedTasks;


    Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

    }

    /** Delegates main run loop to outer runWorker  */

    public void run() {

        runWorker(this);

    }


    protected boolean isHeldExclusively() {

        return getState() != 0;

    }

    protected boolean tryAcquire(int unused) {

        if (compareAndSetState(0, 1)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }

    protected boolean tryRelease(int unused) {

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

    }

    public void lock()        { acquire(1); }

    public boolean tryLock()  { return tryAcquire(1); }

    public void unlock()      { release(1); }

    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {

        Thread t;

        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

            try {

                t.interrupt();

            } catch (SecurityException ignore) {

            }

        }

    }

}

```

###2.4 runWorker 执行任务

​    Work线程启动后,调用内部类worker的runWorker(this)

1. worker.unlock(),将AQS的state置为0,允许中断当前worker线程

2. 执行任务前加锁,防止在执行任务的时候被线程池的一些中断操作中断

3. 加锁后如果线程池停止或者线程中断,则任务中断

4. 执行beforeExecute()、task.run()、afterExecute()完成线程执行,如果异常后则抛异常,执行processWorkerExit()处理worker退出的流程

5. getTask()从阻塞队列中获取新的任务,当队列没有任务获取任务超时则当前woek退出流程

```java

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

        while (task != null || (task = getTask()) != null) {

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted.  This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interrupt

            if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly = false;

    } finally {

        processWorkerExit(w, completedAbruptly);

    }

}

```

###2.5 getTask()获取任务

1. 判断线程池状态为SHUTDOWN并且stop或者workQueue为空的时候,直接返回null,workerCount减一

2. 获取线程池数量,如果超过maximumPoolSize或者timedOut 和 timed 同时为true并且线程数大于1或线程池队列为空,线程池数量加1

3. 如果满足获取任务条件,根据是否需要定时获取调用不同方法:

      workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null

      workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务

```java

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

        try {

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

```

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容