【2019-06-23】线程池

1.为什么要使用线程池

直接创建线程,线程本身需要占用内存空间,大量的线程会抢占宝贵的内存资源,如果处理不当,会导致内存溢出,而且,线程回收会给GC带来很大的压力,延迟GC的停顿时间。

2.常用线程池

<1>newFixedThreadPool
返回一个固定线程数量的线程池
<2>newSingleThreadExecutor()
返回一个只有一个线程的线程池
<3>newCachedThreadPool()
返回一个根据实际情况调整数量的线程池,线程池数量不确定,有空闲线程可以复用则复用,没有就一直会创建新的
<4>newSingleThreadScheduledExecutor()
返回一个计划任务线程池,但是线程数量为1
<5>newScheduledThreadPool()
返回一个计划任务线程池

2.1计划任务

计划任务不一定会立即安排执行任务,会在指定时间内进行调度。
3个重要的方法:
<1>schedule
会在给定的时间内,对任务进行一次调度
<2>scheduleAtFirstRate
会对任务进行周期性调度,周期为上一个任务开始执行的时间+周期时间=下一个任务的开始时间
<3>scheduleWithFixedDelay()
会对任务进行周期性调度,周期为上一个任务结束时间+周期时间=下一个任务的开始时间

任务堆叠处理:
ScheduleExecutorService不会让任务出现堆叠的情况,比如任务周期为2s,但是任务执行花了8秒,上一个任务没有执行完成,下一个任务不会开始执行,造成任务堆叠;但是这里上一任务执行完成后,下一个任务会马上执行。

注意:调度程度并不保证任务会无限期的持续调用,如果任务本身出现异常,那么后续的所有执行都会被中断,因此需要对异常处理。

3.核心线程池的内部实现

线程池核心类

ThreadPoolExecutor(int corePoolSize,
     int maxinumPoolSize, 
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory
    RejectedExecutionHandler handler);

核心参数:

workQueue:

<1>直接提交的队列:SynchronousQueue
它没有容量,每一个插入操作都要等待一个相应的删除操作,每一个删除操作都要等待对应的插入操作。
如果没有空闲线程,则尝试创建新的线程,如果线程数量达到最大值,则使用拒绝策略;
<2>有界的任务队列:ArrayBlockingQueue
如果线程数量小于corePoolSize时,会优先创建新线程,若大于corePoolSize,则会将新任务加入到等待队列;等待队列已满,则在总线程数不大于maxinumPoolSize的前提下,创建新的线程执行任务,若大于,则执行拒绝策略;

<3>无界任务队列 LinkedBlockQueue
除非系统资源耗尽,否则无界的任务队列不存在任务入队列失败的情况;新任务到来时,如果线程数小于corePoolSize, 则创建新的线程执行任务,但当线程池中线程达到poolSize后,就不会增加;

<4>优先任务队列: PriorityBlockingQueue
可以控制任务执行的先后顺序,它是一个特殊的无界队列

拒绝策略

AbortPolicy策略:直接抛出异常,阻止系统正常运行;
CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,显然这样做不会真的丢弃任务,但是,任务提交线程池的性能极有可能会急剧下降;
DiscardOledestPolicy策略:丢弃最老的一个请求,也就是即将被执行的下一个,并尝试再次提交当前任务;
DiscardPolicy策略:丢弃无法处理的任务,不处理;

自定义拒绝策略:实现RejectExecutionHandler接口

自定义线程池

ThreadFactory是一个接口,只有一个方法

Thread newThread(Runnable r);

4.扩展线程池

ThreadPoolExecutor提供beforeExecutor、afterExecutor和terminated() 三个方法对线程池进行控制。

ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,实现了Runnable接口, 执行Worker.runTask()方法内部,执行了
beforeExecutor和afterExecute方法

terminated记录了整个线程池的退出

5.优化线程数量

经验公式
Ncpu -> CPU数量
Ucpu -> 目标cpu的使用率
W/C -> 等待时间与计算时间的比率

最优线程池大小=Ncpu * Ucpu * (1 + W/C)

6.在线程池中寻找堆栈

自定义ThreadPoolExecutor 包装submit方法

    public static class TraceThreadPoolExecutor extends ThreadPoolExecutor {

        public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                       long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public Future<?> submit(Runnable task) {
            return super.submit(wrap(task, clientTrace()));
        }

        private Throwable clientTrace() {
            return new Throwable("trace");
        }

        private Runnable wrap(final Runnable task, final Throwable clientTrace) {
            return new Runnable() {
                @Override
                public void run() {
                    try {
                        task.run();
                    } catch (Throwable e) {
                        e.printStackTrace();
                        throw e;
                    }
                }
            };
        }
    }

    public static void main(String[] args) {
        TraceThreadPoolExecutor executor = new TraceThreadPoolExecutor(2,4, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
        for (int i = 0; i < 5; i ++) {
            executor.submit(new MyTask(i));
        }
        executor.shutdown();
    }

    public static class MyTask implements Runnable {

        private int size;

        public MyTask(int size) {
            this.size = size;
        }

        @Override
        public void run() {
            int result = 100/size;
            System.out.println(result);
        }
    }

7.fork/join框架

含义:
fork() 创建子进程,使得系统进程可以多一个执行分支;
join() 等待,等待这个执行分支执行完毕,才能得到结果

ForkJoinPool中如果A线程执行完毕,线程A会帮助线程B,从线程B的任务队列中拿一个任务过来处理,尽可能达到平衡;

注意:线程执行自己的任务从任务队列顶部拿任务,而帮助别人执行时,从队列底部拿任务,可以有效避免数据竞争;

ForkJoinPoll的重要方法:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);

向ForkJoinPool线程池提交一个ForkJoinTask任务,所谓的ForkJoinTask就是支持fork分解和join等待的任务。

ForkJoinTask有2个重要的子类:RecursiveAction和RecursiveTask. 他们分别代表没有返回值的任务和可以带返回值的任务。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 第一部分 来看一下线程池的框架图,如下: 1、Executor任务提交接口与Executors工具类 Execut...
    压抑的内心阅读 4,380评论 1 24
  • 前段时间遇到这样一个问题,有人问微信朋友圈的上传图片的功能怎么做才能让用户的等待时间较短,比如说一下上传9张图片,...
    加油码农阅读 1,269评论 0 2
  • 深入分析线程池 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如...
    史路比阅读 495评论 0 1
  • Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会...
    逗逼程序员阅读 503评论 0 2
  • 她外表坚强,内心柔软;她横行霸道,气势磅礴;她从不退缩,勇往直前。在她看似青而坚强的外壳里面,藏着一颗柔情似...
    营营苟苟阅读 432评论 0 2

友情链接更多精彩内容