线程池简易实现和线程池源码

线程池简单实现

线程池简要图示
package Thread;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPoolDemo {
    //1.阻塞队列
    private BlockQueue<Runnable> taskQueue;

    //2.核心线程数
    private  int coreSize;

    //3.获取任务的超时时间
    private long timeout;


    //4.时间转换
    private TimeUnit timeUnit;

    //5、线程集合
    HashSet<Worker> workers = new HashSet<>();


    //6.拒绝策略
    private RejectPolicyDemo<Runnable> rejectPolicy;


    public ThreadPoolDemo(int coreSize, long timeout, TimeUnit timeUnit , int queueCap , RejectPolicyDemo<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockQueue<Runnable>(queueCap);
        this.rejectPolicy = rejectPolicy;
    }

    class  Worker extends  Thread{
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            //1.当task 不为空,执行任务
            //2.当task 执行完毕,再接着从任务队列获取任务继续执行
//            while(task != null || (task = taskQueue.take()) != null){  //该策略会死等 ,就算线程池为空,也会一直等待
            while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
                try{
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null; //没有任务了 将task置为null
                }
            }

            synchronized (workers){
                workers.remove(this);
            }

        }
    }




    //执行任务
    public  void excute(Runnable task){
        //如果任务数没有超过 coresize 时 ,直接交给worker对象执行
        //如果超过了 coresize时 ,将任务加入到阻塞队列中
      synchronized (workers){

          if (workers.size() < coreSize) {
              Worker worker = new Worker(task);
              workers.add(worker);
              worker.start();
          }
          else {
              //可选择阻塞队列满足后 选择拒绝策略
//              taskQueue.put(task);


              //1.死等
              //2 带超时等待
              //3 让调用者放弃任务
              //4 让调用者抛出异常
              //5 让调用者自己执行任务

              taskQueue.tryPut(rejectPolicy , task);

          }
      }
    }

    public static void main(String[] args) {
        ThreadPoolDemo threadPool = new ThreadPoolDemo(2 , 1000 , TimeUnit.MILLISECONDS , 100 ,
                (queue, task)->{
                    //1.死等
//                    queue.put(task);
                    //2 带超时等待
//                    queue.offer(task , 500 ,TimeUnit.MILLISECONDS );
                    //3 让调用者放弃任务
//                    System.out.println("放弃任务");
                    //4 让调用者抛出异常
//                    throw new RuntimeException("运行失败"+task); 抛出异常后 后续的任务不会再执行
                    //5 让调用者自己执行任务
                task.run();
                });
    }

}


@FunctionalInterface
interface  RejectPolicyDemo<T>{
    void reject(BlockQueue<T>  queue , T task);
}

class BlockQueue<T> {
    public BlockQueue() {

    }

    public BlockQueue(int capcity) {
        this.capcity = capcity;
    }

    // 1.阻塞队列
    private Deque<T> queue = new ArrayDeque<>();

    //2.锁
    private ReentrantLock lock = new  ReentrantLock();

    //3. 生产者变量
    private Condition fullWaitSet = lock.newCondition();

    //4.消费者变量
    private Condition emptyWaitSet = lock.newCondition();

    //5.容量
    private  int capcity;


    //带超时的阻塞获取
    public  T poll(long timeout , TimeUnit unit){
        lock.lock();
        try {
            //将timeout 统一转换成 纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if(nanos <= 0){
                        //超时
                        return null;
                    }
                    //防止虚假唤醒,返回的是剩余时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }
        finally {
            lock.unlock();
        }

    }


    //阻塞获取
    public T take(){
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }
        finally {
            lock.unlock();
        }
    }

    //带超时时间的阻塞添加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);

            while(capcity == queue.size()){
                try {
                    if(nanos <= 0){
                        return false;
                    }
                    fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;

        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public  void put(T element){
        lock.lock();
        try {
            while(capcity == queue.size()){
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(element);
            emptyWaitSet.signal();

        } finally {
            lock.unlock();
        }
    }



    public int size(){
        lock.unlock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicyDemo<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判断队列是否已满
            if(queue.size() == capcity){
                rejectPolicy.reject(this , task);
            }else { //有空闲
                queue.addLast(task);
                emptyWaitSet.signal();
            }

        }finally {
            lock.unlock();
        }


    }
}

源码

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量,ThreadPoolExecutor 类中的线程状态变量如下:

    // Integer.SIZE 值为 32 
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
状态名 高三位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 N Y 不接受新任务,但是处理任务队列中的任务
STOP 001 N N 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 - - 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 - - 终结状态

从数字上面比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

线程池状态和线程池中线程的数量由一个原子整型ctl来共同表示

  • 使用一个数来表示两个值的主要原因时:可以通过一次cas同时更改两个属性的值

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造参数解释:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
    • maximumPoolSize - corePoolSize = 救急线程数
  • keepAliveTime:救急线程空闲时的最大生存时间
  • unit:时间单位
  • workQueue:阻塞队列(存放任务)
    • 有界阻塞队列 ArrayBlockingQueue
    • 无界阻塞队列 LinkedBlockingQueue
    • 最多只有一个同步元素的队列 SynchronousQueue
    • 优先队列 PriorityBlockingQueue
  • threadFactory:线程工厂(给线程取名字)
  • handler:拒绝策略

救急线程在核心线程和阻塞队列都放不下了才会使用

工作方式:

  1. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  2. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入 workQueue 队列排 队,直到有空闲的线程。
  3. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急。
  4. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 下面的前 4 种实现,其它著名框架也提供了实现
    1. ThreadPoolExecutor.AbortPolicy 让调用者抛出RejectedExecutionException 异常,这是默认策略
    2. ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务
    3. ThreadPoolExecutor.DiscardPolicy 放弃本次任务
    4. ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    5. Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
    6. Netty 的实现,是创建一个新线程来执行任务
    7. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    8. PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  5. 当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。


    jdk线程池决拒绝策略

总览

线程池类结构

其他工具类

ThreadPoolExecutor

ThreadPoolExecutor 是 JDK 中的线程池实现,这个类实现了一个线程池需要的各个方法,它实现了任务提交、线程管理、监控等等方法。
我们经常会使用Executors这个工具类来快速构造一个线程池,对于初学者而言,这种工具类是很有用的,开发者不需要关注太多的细节,只要知道自己需要一个线程池,仅仅提供必需的参数就可以了,其他参数都采用作者提供的默认值。



public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这两个方法都会进行使用ThreadPoolExecutor来创建一个ThreadPoolExecutor实例(具体可见前面构造方法)

Doug Lea 采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(即使只有 29 位,也已经不小了,大概 5 亿多,现在还没有哪个机器能起这么多线程的吧)。

核心方法 execute()

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
 // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    // 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
        if (workerCountOf(c) < corePoolSize) {
         // 添加任务成功,那么就结束了。提交任务嘛,线程池已经接受了这个任务,这个方法也就可以返回了
        // 至于执行的结果,到时候会包装到 FutureTask 中。
        // 返回 false 代表线程池不允许提交任务
            if (addWorker(command, true))
                return;
           // 前面说的那个表示 “线程池状态” 和 “线程数” 的整数
            c = ctl.get();
        }

      // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了

   // 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
     // 如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
        // 到这里,我们知道了,这块代码的真正意图是:担心任务提交到队列中了,但是线程都关闭了
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 // 如果 workQueue 队列满了,那么进入到这个分支
    // 以 maximumPoolSize 为界创建新的 worker,
    // 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

四个拒绝策略的具体实现

// 只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

// 不管怎样,直接抛出 RejectedExecutionException 异常
// 这个是默认的策略,如果我们构造线程池的时候不传相应的 handler 的话,那就会指定使用这个
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

// 不做任何处理,直接忽略掉这个任务
public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

// 这个相对霸道一点,如果线程池没有被关闭的话,
// 把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

Executor生成不一样的连接池

  • 生成一个固定大小的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。

过程分析:刚开始,每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

  • 生成只有一个线程的固定线程池,这个更简单,和上面的一样,只要设置线程数为 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程(如果这个线程当前没有任务)的线程池:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue。于 corePoolSize 是 0,那么提交任务的时候,直接将任务提交到队列中。

总结

  • java线程池七大属性

corePoolSize,
maximumPoolSize,
workQueue,
keepAliveTime,
unit,
threadFactory,
rejectedExecutionHandler

  • 线程池中的线程创建时机
  • 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;
  • 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;
  • 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。
  • 任务执行过程中发生异处理

如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。

  • 执行拒绝策略的时机
  • workers 的数量达到了 corePoolSize(任务此时需要进入任务队列),任务入队成功,与此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。
  • workers 的数量大于等于 corePoolSize,将任务加入到任务队列,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。

参考

https://www.javadoop.com/post/java-thread-pool

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容