JDK并发包-线程复用:线程池

为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。线程池中,总有那么几个活跃线程。当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将整个线程退回到池子,方便其他人使用。

1.1 JDK对线程池的支持

JDK提供一套Executor框架,帮助开发人员更好的有效的控制多线程。

其中ThreadPoolExecutor表示一个线程池。Executors类则扮演这线程池工厂的角色。通过Executors可以取得一个拥有特定功能的线程池。ThreadPoolExecutor类实现了Executor接口,因此通过这个接口,任何Runnable的对象都可以被ThreadPoolExecutor线程池调度。

Executor框架提供了各种类型的线程池,主要有以下工厂方法:

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  • newFixedThreadPool:返回一个固定数量的线程池。当一个新任务提交时,如果有空闲线程,则执行。否则新任务暂存在一个任务队列中,待有空闲时,便处理在任务队列中的任务。
  • newSingleThreadExecutor:返回一个线程的线程池。当多余一个新任务提交时,会暂存在一个任务队列中,待有空闲时,按先入先出的顺序处理在任务队列中的任务。
  • newCachedThreadPool:返回一个可根据实际情况调整线程数量的线程池,线程数量不确定,若有空闲,则会有限复用线程。否则创建新线程处理任务。所有线程在当前任务执行完后,将返回线程池待复用。
  • newSingleThreadScheduledExecutor:返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService在Executor接口之上扩展了在给定时间执行某任务的功能。如果在某个固定的延时之后执行,或周期性执行某个任务。可以用这个工厂。
  • newScheduledThreadPool,返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。

1.2 核心线程池的内部实现

对于核心的几个线程池,无论是newFixedThreadPool()方法、newSingleThreadExecutor()还是newCachedThreadPool()方法,其内部实现均使用了ThreadPoolExecutor实现。下面给出三个线程池的实现方式:

public static ExecutorService newSingleThreadExecutor() {   
    return new FinalizableDelegatedExecutorService   
        (new ThreadPoolExecutor(1, 1,   
                                0L, TimeUnit.MILLISECONDS,   
                                new LinkedBlockingQueue<Runnable>()));   
}

public static ExecutorService newFixedThreadPool(int nThreads) {     
eturn new ThreadPoolExecutor(nThreads, nThreads,     
  0L, TimeUnit.MILLISECONDS,     
                new LinkedBlockingQueue<Runnable>());     
}  

public static ExecutorService newCachedThreadPool() {   
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                  60L, TimeUnit.SECONDS,   
                                  new SynchronousQueue<Runnable>());   
}

由以上线程池实现代码可以看到,它们都只是ThreadPoolExecutor类的封装。ThreadPoolExecutor最重要的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:指定了线程池中的线程数量。
  • maximumPoolSize:指定了线程池中的最大线程数量。
  • keepAliveTime:当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。
  • unit:keepAliveTime的单位。
  • workQueue:任务队列,被提交但尚未被执行的任务。
  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  • handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。

参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用一下几种BlockingQueue。

  1. 直接提交的队列:该功能由synchronousQueue对象提供,synchronousQueue对象是一个特殊的BlockingQueue。synchronousQueue没有容量,每一个插入操作都要等待一个响应的删除操作,反之每一个删除操作都要等待对应的插入操作。如果使用synchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建线程,如果线程数量已经达到了最大值,则执行拒绝策略,因此,使用synchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
  2. 有界的任务队列:有界任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue构造函数必须带有一个容量参数,表示队列的最大容量。
public ArrayBlockingQueue(int capacity)。

当使用有界任务队列时,若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程。若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见有界队列仅当在任务队列装满后,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。

  1. 无界的任务队列:无界队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,无界队列的任务队列不存在任务入队失败的情况。若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程执行。但当系统的线程数量达到corePoolSize后就不再创建了,这里和有界任务队列是有明显区别的。若后续还有新任务加入,而又没有空闲线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,知道耗尽系统内存。
  2. 优先任务队列:带有优先级别的队列,它通过PriorityBlokingQueue实现,可以控制任务执行的优先顺序。它是一个特殊的无界队列。无论是ArrayBlockingQueue还是LinkedBlockingQueue实现的队列,都是按照先进先出的算法处理任务,而PriorityBlokingQueue根据任务自身优先级顺序先后执行,在确保系统性能同时,也能很好的质量保证(总是确保高优先级的任务优先执行)。

newFixedThreadPool()方法的实现,它返回一个corePoolSize和maximumPoolSize一样的,并使用了LinkedBlockingQueue任务队列(无界队列)的线程池。当任务提交非常频繁时,该队列可能迅速膨胀,从而系统资源耗尽。

newSingleThreadExecutor()返回单线程线程池,是newFixedThreadPool()方法的退化,只是简单的将线程池数量设置为1。

newCachedThreadPool()方法返回corePoolSize为0而maximumPoolSize无穷大的线程池,这意味着没有任务的时候线程池内没有现场,而当任务提交时,该线程池使用空闲线程执行任务,若无空闲则将任务加入SynchronousQueue队列,而SynchronousQueue队列是直接提交队列,它总是破事线程池增加新的线程来执行任务。当任务执行完后由于corePoolSize为0,因此空闲线程在指定时间内(60s)被回收。对于newCachedThreadPool(),如果有大量任务提交,而任务又不那么快执行时,那么系统变回开启等量的线程处理,这样做法可能会很快耗尽系统的资源,因为它会增加无穷大数量的线程。

1.3 拒绝策略

当任务数量超过系统实际承载能力时,需要拒绝策略机制解决这个问题。JDK内置的拒绝策略如下:

  1. AbortPolicy : 直接抛出异常,阻止系统正常运行。
  2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
  4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。

以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下:

public interface RejectedExecutionHandler() {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor  executor);
});

下面代码演示自定义线程池和拒绝策略的使用:

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:"
                    + Thread.currentThread().getId());

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(10),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor  executor) {    
                        System.out.println(r.toString() + "is discard");
                    }
                });

        for(int i=0; i<Integer.MAX_VALUE; i++)  {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

1.4 ThreadFactory自定义线程创建

ThreadFactory是一个接口,它只有一个方法,用来创建线程:

Thread newThread(Runnable r);

下面是使用自定义ThreadFactory的案例,一方面记录线程的创建,另一方面将所有线程设置为守护线程,这样主线程退出,将会强制销毁线程池。


public static void main(String[] args) throws InterruptedException {
    MyTask task = new MyTask();
    ExecutorService es = new ThreadPoolExecutor(5,5,
            0L, TimeUnit.MILLISECONDS,
            new SynchronousQueue<Runnable>(),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    System.out.println("create " + t);
                    return t;
                }
            });
    for(int i=0; i<5; i++) {
        es.submit(task);
    }
    Thread.sleep(2000);
}

1.5 扩展线程池

虽然JDK实现了非常稳定的高性能线程池,但是我们可以对这个线程池做一些扩展,增强线程池的功能。ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()、terminated()三个接口对线程池进行控制。这三个方法分别用于记录一个任务的开始、结束和整个线程池的退出。下面演示了对线程池的扩展,在这个拓展中,我们将记录每一个任务的执行日志。

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId()
                    + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask)r).name);
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask)r).name);
            }
            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };
        for(int i=0; i<5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

1.6 优化线程池线程数量

线程池的大小对系统性能有一定的影响,过大或过小的线程数量都无法发挥最优的系统性能,因此要避免极大和极小两种情况。 在《java Concurrency in Practice》中给出了一个估算线程池大小的经验公式:

Ncpu = CPU数量
Ucpu = 目标CPU的使用率(0 ≤ Ucpu ≤ 1 )
W/C = 等待时间与计算时间的比率
最优的池大小等于
Nthreads = Ncpu * Ucpu * (1+W/C)

在java中可以通过Runtime.getRuntime().availableProcessors()取得可用CPU数量。

1.7 在线程池中寻找堆栈

向线程池讨回异常堆栈的方法。一种最简单的方法,就是放弃submit(),改用execute()。将任务提交的代码改成:

pools.execute(new DivTask(100,i));

或者使用下面的方法改造submit():

Future re = pools.submit(new DivTask(100,i));
re.get();

上述方法,从异常堆栈中只能知道异常在哪里抛出的。还希望知道这个任务在哪里提交的,需要我们扩展ThreadPoolExecutor线程池,让它在调度之前,先保存一下提交任务线程的堆栈信息。如下所示:

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
            long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue);
    }
    @Override
    public void execute(Runnable task) {
        // TODO Auto-generated method stub
        super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        // TODO Auto-generated method stub
        return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
    private Runnable wrap(final Runnable task,final Exception clientStack, String clientThreadName) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    try {
                        throw e;
                    } catch (Exception e1) {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                }
            }
        };
    }
    public static class DivTask implements Runnable {
        int a,b;
        public DivTask(int a,int b) {
            this.a = a;
            this.b = b;
        }
        @Override
        public void run() {
            double re = a/b;
            System.out.println(re);
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
                0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for(int i=0; i<5; i++)
            pools.execute(new DivTask(100, i));
    }
}

1.8 Fork/Join框架

分而治之一直是一个非常有效处理大量数据的方法。fork/join框架是ExecutorService接口的一个实现,可以帮助开发人员充分利用多核处理器的优势,编写出并行执行的程序,提高应用程序的性能;设计的目的是为了处理那些可以被递归拆分的任务。
在实际使用中,如果毫无顾忌地使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出了一个ForkJoinPool线程池,对于fork()方法并不急于开启线程,而是提交给ForkJoinPool()线程池进行处理,以节省系统资源。可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask。它们分别代表没有返回值的任务和可以携带返回值的任务。
下面简单地展示Fork/Join框架的使用,这里用来计算数列求和:

public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;
    public CountTask(long start,long end) {
        this.start = start;
        this.end = end;
    }
    public Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if(canCompute) {
            for(long i=start; i<=end; i++) {
                sum += i;
            }
        } else {
            long step = (start + end)/100;
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
            long pos = start;
            for(int i=0; i<100; i++) {
                long lastOne = pos + step;
                if(lastOne > end) lastOne = end;
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step+1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for(CountTask t:subTasks) {
                sum += t.join();
            }
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum="+res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在使用ForkJoin时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:第一,系统内的线程数量越积越多,导致性能严重下降。第二,函数的调用层次变得很深,最终导致栈溢出。
此外,ForkJoin线程池使用一个无锁是栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容

  • 我问浪氏爷爷:我有很多很多的烦恼,你能帮我办法解决吗? 浪氏爷爷说:其实烦恼的答案你已经知道了,只不过你需要一个人...
    妫夏阅读 300评论 0 0
  • 4月3日21:01周二,麦医生主讲《症状与配方》笔记: 一、先沟通观念,问生活习惯,切忌不要直接给配方 1、消化道...
    冠丽阅读 945评论 0 1