自定义线程池

前言

最近在看并发编程艺术这本书,对看书的一些笔记及个人工作中的总结。

hashiqi.jpeg

线程池的优势

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的实现原理

提交一个新的任务到线程池中,线程池的处理流程:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是(也就是当前执行任务的线程数小于核心线程数corePoolSize),则创建一个新的线程执行任务。如果线程池中的线程都在执行任务,也就是说当前的工作线程数大于等于corePoolSize),则进入下个流程
2)线程池判断工作队列(一般是阻塞队列BlockingQueue)是否已经满了。如果工作队列没有满,则将任务增加到工作队列中。如果工作队列满了,则进入下个流程,
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略(或者叫做拒绝策略)来处理这个任务。

ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

下面是ThreadPoolExecutor的一个构造方法:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

稍微分析一下这几个参数,官方api都有讲:
corePoolSize:当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果使用的是无界的队列,那么这个参数没有意义了。
keepAliveTime:线程池工作线程空闲后,保持的存活时间。
unit :上面时间的单位
workQueue:执行任务在执行之前加入的queue。该队列仅保存由execute方法提交的Runnable任务。
threadFactory:创建新线程的工厂方法。可以通过线程工厂给每个创建的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字
handler:当队列满时,线程处于饱和策略。拒绝策略handler
jdk1.5提供了下面几种策略:

图片.png

AbortPolicy:直接抛异常。
DiscardPolicy:顾名思义,直接丢弃多余的任务。
DiscardOldestPolicy:丢弃队列中最老的任务,并且重试exexute方法,并发书中说丢弃最新的一个任务,这边书中应该描述错了。
CallerRunsPolicy:在调用线程中执行丢弃的任务。

也可以自定义拒绝策略,比如记录日志或者持久化处理。

执行源码:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
}

执行线程次提交的任务

通过execute方法和submit方法(submit提供了重载方法)。execute不返回值,而submit返回值,并且可以通过返回的Future对象的get方法查看返回值。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

定义在ExecutorService类中:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//BlockingQueue使用有界阻塞队列
public class ThreadPoolExecutorTest1 {
    public static void main(String[] args) {
        /**
         * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
         * 若大于corePoolSize,则会将任务加入队列,
         * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
         * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
         *
         * 这个构造方法使用默认的拒绝策略,AbortPolicy(即抛出异常)
         */
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,              //coreSize
                2,              //MaxSize
                60,             //60
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3)         //指定一种队列 (有界队列)
        );

        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        //MyTask mt6 = new MyTask(6, "任务6");

        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
        //pool.execute(mt6);

        pool.shutdown();

    }
}
public class MyTask implements Runnable {

    private int taskId;
    private String taskName;

    public MyTask(int taskId, String taskName){
        this.taskId = taskId;
        this.taskName = taskName;
    }
    @Override
    public void run() {
        try {
            System.out.println("run taskId =" + this.taskId);
            Thread.sleep(5*1000);
            //System.out.println("end taskId =" + this.taskId);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String toString(){
        return Integer.toString(this.taskId);
    }

}

使用无界BlockingQueue:

与有界队列相比,除非系统资源耗尽,否则无界的队列不存在任务入队失败的情况。当有新任务到来,系统的线程小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的队列加入,而没有空闲的线程资源,则队列直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
所以一般不建议使用无界队列

//使用无界队列的时候,最大线程数,拒绝策略等都失去了意义
public class ThreadPoolExecutorTest2 implements Runnable{
    private static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void run() {
        try {
            int temp = count.incrementAndGet();
            System.out.println("任务" + temp);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception{
        BlockingQueue<Runnable> queue =
                new LinkedBlockingQueue<>();
        ExecutorService executor  = new ThreadPoolExecutor(5, 10, 120L, TimeUnit.SECONDS, queue);

        for(int i = 0 ; i < 100; i++){
            executor.execute(new ThreadPoolExecutorTest2());
        }
        Thread.sleep(1000);
        System.out.println("queue size:" + queue.size());
        Thread.sleep(2000);
    }
}

自定义拒绝策略:

//和第一个demo相比,因为默认使用的策略是AbortPolicy(抛出一个拒绝异常),而我们这个使用了自定义的拒绝策略
public class ThreadPoolExecutorTest3 {

    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1,              //coreSize
                2,              //MaxSize
                60,             //60
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3)         //指定一种队列 (有界队列)
                , new MyRejected()
//              , new DiscardOldestPolicy()
        );

        MyTask mt1 = new MyTask(1, "任务1");
        MyTask mt2 = new MyTask(2, "任务2");
        MyTask mt3 = new MyTask(3, "任务3");
        MyTask mt4 = new MyTask(4, "任务4");
        MyTask mt5 = new MyTask(5, "任务5");
        MyTask mt6 = new MyTask(6, "任务6");

        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
        pool.execute(mt6);

        pool.shutdown();

    }
}
public class MyRejected implements RejectedExecutionHandler {
    public MyRejected(){
    }

    //拒绝策略实际调用的方法处理丢弃的任务,这边可以持久化,可以进行日志处理
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("自定义处理..");
        System.out.println("当前被拒绝任务为:" + r.toString());
    }

}

下一篇博客将会分析一下jdk提供的Executor框架创建线程池的几种底层实现原理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容