java多线程系列:ThreadPoolExecutor

ThreadPoolExecutor自定义线程池

开篇一张图(图片来自阿里巴巴Java开发手册(详尽版)),后面全靠编

[图片上传失败...(image-8bb565-1529197579747)]

好了要开始编了,从图片中就可以看到这篇博文的主题了,ThreadPoolExecutor自定义线程池。

目录

  1. ThreadPoolExecutor构造函数介绍
  2. 核心线程数corePoolSize
  3. 最大线程数maximumPoolSize
  4. 线程存活时间keepAliveTime
  5. 线程存活时间单位unit
  6. 创建线程的工厂threadFactory
  7. 队列
  8. 拒绝策略
  9. 线程池扩展

ThreadPoolExecutor构造函数介绍

在介绍穿件线程池的方法之前要先介绍一个类ThreadPoolExecutor,因为Executors工厂大部分方法都是返回ThreadPoolExecutor对象,先来看看它的构造函数吧

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

参数介绍

参数 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 时间单位
workQueue BlockingQueue<Runnable> 存放线程的队列
threadFactory ThreadFactory 创建线程的工厂
handler RejectedExecutionHandler 多余的的线程处理器(拒绝策略)

核心线程数corePoolSize

这个参数表示线程池中的基本线程数量也就是核心线程数量。

最大线程数maximumPoolSize

这个参数是线程池中允许创建的最大线程数量,当使用有界队列时,且队列存放的任务满了,那么线程池会创建新的线程(最大不会超过这个参数所设置的值)。需要注意的是,当使用无界队列时,这个参数是无效的。

线程存活时间keepAliveTime

这个就是线程空闲时可以存活的时间,一旦超过这个时间,线程就会被销毁。

线程存活时间单位unit

线程存活的时间单位,有NANOSECONDS(纳秒)、MICROSECONDS(微秒)、MILLISECONDS(毫秒)、SECONDS(秒)、MINUTES(分钟)、HOURS(小时)、DAYS(天)。TimeUnit代码如下

public enum TimeUnit {
    NANOSECONDS {...},

    MICROSECONDS {...},

    MILLISECONDS {...},

    SECONDS {...},

    MINUTES {...},

    HOURS {...},

    DAYS {...};
}

创建线程的工厂threadFactory

创建线程的工厂,一般都是采用Executors.defaultThreadFactory()方法返回的DefaultThreadFactory,当然也可以用其他的来设置更有意义的名称。

DefaultThreadFactory类如下

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

队列

分为有界队列和无界队列,用于存放等待执行的任务的阻塞队列。有SynchronousQueue、ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue、PriorityBlockingQueue、LinkedTransferQueue、DelayedWorkQueue、LinkedBlockingDeque。下面将介绍有界和无界两种常用的队列。BlockingQueue类图如下

[图片上传失败...(image-d009ef-1529197579747)]

有界队列

当使用有界队列时,如果有新的任务需要添加进来时,如果线程池实际线程数小于corePoolSize(核心线程数),则优先创建线程,如果线程池实际线程数大于corePoolSize(核心线程数),则会将任务加入队列,若队列已满,则在中现场数不大于maximumPoolSize(最大线程数)的前提下,创建新的线程,若线程数大于maximumPoolSize(最大线程数),则执行拒绝策略。

无界队列

当使用无界队列时,maximumPoolSize(最大线程数)和拒绝策略便会失效,因为队列是没有限制的,所以就不存在队列满的情况。和有界队列相比,当有新的任务添加进来时,都会进入队列等待。但是这也会出现一些问题,例如线程的执行速度比任务提交速度慢,会导致无界队列快速增长,直到系统资源耗尽。

拒绝策略

当使用有界队列时,且队列任务被填满后,线程数也达到最大值时,拒绝策略开始发挥作用。ThreadPoolExecutor默认使用AbortPolicy拒绝策略。RejectedExecutionHandler类图如下

[图片上传失败...(image-3ff1fa-1529197579747)]

我们来看看ThreadPoolExecutor是如何调用RejectedExecutionHandler的,可以直接查看execute方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();

            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }else if (!addWorker(command, false))
                //拒绝线程
                reject(command);
        }
}

可以看到经过一系列的操作,不符合条件的会调用reject方法,那我么接着来看看reject方法

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

可以看到调用了RejectedExecutionHandler接口的rejectedExecution方法。好了,现在来看看jdk提供的几个拒绝策略。

拒绝策略的测试代码在这

注:后续会写一篇ThreadPoolExecutor源码解析,专门介绍ThreadPoolExecutor各个流程

AbortPolicy

从下面代码可以看到直接抛出异常信息,但是线程池还是可以正常工作的。

public static class AbortPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

示例代码

线程类

public class Task implements Runnable{

   private int id ;

   public Task(int id){
      this.id = id;
   }

   public int getId() {
      return id;
   }
   public void setId(int id) {
      this.id = id;
   }

   @Override
   public void run() {
      //
      System.out.println(LocalTime.now()+" 当前线程id和名称为:" + this.id);
      try {
         Thread.sleep(1000);
      } catch (Exception e) {
         e.printStackTrace();
      }
   }


   public String toString(){
      return "当前线程的内容为:{ id : " + this.id + "}";
   }

}

测试代码

public class TestAbortPolicy {

    public static void main(String[] args) {
        //定义了1个核心线程数,最大线程数1个,队列长度2个
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new ThreadPoolExecutor.AbortPolicy());


        //直接提交4个线程
        executor.submit(new Task(1));
        executor.submit(new Task(2));
        executor.submit(new Task(3));
        //提交第四个抛异常
        executor.submit(new Task(4));

    }
}

执行结果

当前线程id和名称为:1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1540e19d rejected from java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.learnConcurrency.executor.customThreadPool.testRejectedExecutionHandler.TestAbortPolicy.main(TestAbortPolicy.java:25)
当前线程id和名称为:2
当前线程id和名称为:3

可以看到添加第四个线程是抛出异常

CallerRunsPolicy

首先判断线程池是否关闭,如果未关闭,则直接执行该线程。关闭则不做任何事情。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

代码和上面的差不多就不贴了,想要查看的可以到github上查看TestCallerRunsPolicy,执行结果如下

14:58:19.462 当前线程id和名称为:4
14:58:19.462 当前线程id和名称为:1
14:58:20.464 当前线程id和名称为:5
14:58:20.464 当前线程id和名称为:2
14:58:21.464 当前线程id和名称为:3
14:58:22.464 当前线程id和名称为:6

DiscardPolicy

可以看到里面没有任何代码,也就是这个被拒绝的线程任务被丢弃了,不作任何处理。

public static class DiscardPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

首先判断线程池是否关闭,如果未关闭,丢弃最老的一个请求,尝试再次提交当前任务。 关闭则不做任何事情。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
   
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

代码和上面的差不多就不贴了,想要查看的可以到github上查看TestDiscardOldestPolicy,执行结果如下

15:02:28.484 当前线程id和名称为:1
15:02:29.486 当前线程id和名称为:5
15:02:30.487 当前线程id和名称为:6

可以看到线程2、3、4都被替换了

自定义拒绝策略

实现RejectedExecutionHandle接口即可,如下MyRejected

public class MyRejected implements RejectedExecutionHandler{

   @Override
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      System.out.println("自定义处理:开始记录日志");
      System.out.println(r.toString());
      System.out.println("自定义处理:记录日志完成");
   }

}

测试代码

public class TestCustomeRejectedPolicy {

    public static void main(String[] args) {
        //定义了1个核心线程数,最大线程数1个,队列长度2个
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new MyRejected());


        executor.execute(new Task(1));
        executor.execute(new Task(2));
        executor.execute(new Task(3));
        executor.execute(new Task(4));
        executor.execute(new Task(5));
        executor.execute(new Task(6));


        executor.shutdown();
    }
}

输出结果

自定义处理:开始记录日志
当前线程的内容为:{ id : 4}
自定义处理:记录日志完成
自定义处理:开始记录日志
当前线程的内容为:{ id : 5}
自定义处理:记录日志完成
自定义处理:开始记录日志
当前线程的内容为:{ id : 6}
自定义处理:记录日志完成
15:12:39.267 当前线程id和名称为:1
15:12:40.268 当前线程id和名称为:2
15:12:41.268 当前线程id和名称为:3

Process finished with exit code 0

这里如果有仔细观察的你可能会有所好奇,为什么这里用execute方法而不是用submit?

这时因为用submit方法后,传入的线程会被封装成RunnableFuture,而我写的MyRejected有调用到toString方法,Task类有重写toString方法,但是被封装成RunnableFuture会输入如下内容

自定义处理:开始记录日志
java.util.concurrent.FutureTask@1540e19d
自定义处理:记录日志完成
自定义处理:开始记录日志
java.util.concurrent.FutureTask@677327b6
自定义处理:记录日志完成
自定义处理:开始记录日志
java.util.concurrent.FutureTask@14ae5a5
自定义处理:记录日志完成
15:18:17.262 当前线程id和名称为:1
15:18:18.263 当前线程id和名称为:2
15:18:19.264 当前线程id和名称为:3

Process finished with exit code 0

线程池扩展

ThreadPoolExecutor类中有三个方法是空方法,可以通过继承来重写这三个方法对线程进行监控。通过重写beforeExecute和afterExecute方法,可以添加日志、计时、监控等等功能。terminated方法是在线程关闭时调用的,可以在这里面进行通知、日志等操作。

//任务执行前
protected void beforeExecute(Thread t, Runnable r) { }
//任务执行后
protected void afterExecute(Runnable r, Throwable t) { }
//线程池关闭
protected void terminated() { }

示例代码

public class Main {

    public static void main(String[] args) {
        ThreadPoolExecutor pool = new MyThreadPoolExecutor(
                2,              //coreSize
                4,              //MaxSize
                60,          //60
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(4));

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    Thread.sleep(new Random().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        pool.shutdown();
    }

    static class MyThreadPoolExecutor extends ThreadPoolExecutor{
        private final AtomicInteger tastNum = new AtomicInteger();
        private final ThreadLocal<Long> startTime = new ThreadLocal<>();

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            startTime.set(System.nanoTime());
            System.out.println(LocalTime.now()+" 执行之前-任务:"+r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            long endTime = System.nanoTime();
            long time = endTime - startTime.get();
            tastNum.incrementAndGet();
            System.out.println(LocalTime.now()+" 执行之后-任务:"+r.toString()+",花费时间(纳秒):"+time);
            super.afterExecute(r, t);
        }

        @Override
        protected void terminated() {
            System.out.println("线程关闭,总共执行线程数:"+tastNum.get());
            super.terminated();
        }
    }

}

执行结果

15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@469dad33
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@1446b68c
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@5eefc31e
15:43:23.329 执行之前-任务:java.util.concurrent.FutureTask@33606b2
15:43:23.513 执行之后-任务:java.util.concurrent.FutureTask@33606b2,花费时间(纳秒):216399556
15:43:23.513 执行之前-任务:java.util.concurrent.FutureTask@236e71ad
15:43:23.601 执行之后-任务:java.util.concurrent.FutureTask@1446b68c,花费时间(纳秒):304505594
15:43:23.601 执行之前-任务:java.util.concurrent.FutureTask@107920dc
15:43:23.733 执行之后-任务:java.util.concurrent.FutureTask@5eefc31e,花费时间(纳秒):436283680
15:43:23.733 执行之前-任务:java.util.concurrent.FutureTask@502826b3
15:43:23.808 执行之后-任务:java.util.concurrent.FutureTask@469dad33,花费时间(纳秒):512242583
15:43:23.808 执行之前-任务:java.util.concurrent.FutureTask@96741ab
15:43:23.924 执行之后-任务:java.util.concurrent.FutureTask@107920dc,花费时间(纳秒):322900976
15:43:24.059 执行之后-任务:java.util.concurrent.FutureTask@236e71ad,花费时间(纳秒):546324680
15:43:24.498 执行之后-任务:java.util.concurrent.FutureTask@502826b3,花费时间(纳秒):765309335
15:43:24.594 执行之后-任务:java.util.concurrent.FutureTask@96741ab,花费时间(纳秒):785868205
线程关闭,总共执行线程数:8

代码位置

GitHub地址

地址在这

觉得不错的点个star

参考资料

[1] Java 并发编程的艺术

[2] Java 并发编程实战

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容

  • 为什么使用线程池 当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题...
    闽越布衣阅读 4,281评论 10 45
  • 线程池的概念和定义 在服务器端的业务应用开发中,Web服务器(诸如Tomcat、Jetty)需要接受并处理http...
    dtdh阅读 831评论 0 1
  • 线程池作为Java中一个重要的知识点,看了很多文章,在此以Java自带的线程池为例,记录分析一下。本文参考了Jav...
    峡客阅读 993评论 0 11
  • 其实,忘了自己什么时候决定考研得了 我只记得 查了一个多星期的考研资料和有关信息,在多得数不清(至少我数不清)院校...
    时与不时阅读 124评论 0 0
  • 今年最后一天, 反复想了几遍。 好像啥都没做, 闲话聊个没完。
    墨趣356阅读 156评论 8 9