我们在java并发编程- 5 - 线程池下join()的替代方案:CountDownLatch、CyclicBarrier 里其实已经用过线程池了。线程池是为了让线程可重复利用,并且方便对线程的管理。
之前我们用的是Executors去创建的线程池。但是现在很多公司的编码规范里已经要求不能使用Executors了。原因是因为它使用了无界的LinkedBlockingQueue,在高负载下,很容易OOM。
推荐使用ThreadPoolExecutor。
1 基本用法:
//创建一个线程池:
    //corePooSize:线程池维护的线程的最小数量。
    //maximumPoolSize:线程池维护的线程的最大数量
    //keepAliveTime: 允许最大空闲的时间。当前线程>=corePooSize,超过该时间的空闲的线程,会被回收
    //unit: keepAliveTime的单位
    //workQueue:缓冲队列
    //申请线程时:1、如果当前线程<corePooSize,则新创建。
    //          2、如果当前线程>=corePooSize,workQueue没满,则线程进入workQueue
    //          3、如果当前线程>=corePooSize,workQueue已满,maximumPoolSize未达到,则新建线程处理任务。
    //          4、如果当前线程>=corePooSize,workQueue已满,maximumPoolSize已达到,则执行handler的模式去拒绝任务。
    //handler:线程池对拒绝任务的处理方式
    //  ThreadPoolExecutor.DiscardOldestPolicy() :抛弃旧的任务
    //  ThreadPoolExecutor.AbortPolicy() : 默认, 抛出java.util.concurrent.RejectedExecutionException异常
    //  ThreadPoolExecutor.CallerRunsPolicy() : 重试添加当前的任务,他会自动重复调用execute()方法
    //  ThreadPoolExecutor.DiscardPolicy() :抛弃当前的任务
    ThreadPoolExecutor executor=new ThreadPoolExecutor(20,50,3, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(200),new ThreadPoolExecutor.DiscardOldestPolicy());
    @Test
    public void executorTest() throws InterruptedException {
        executor.execute(()->{
            while (true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("1");
            }
        });
        executor.execute(()->{
            while (true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("2");
            }
        });
        //阻塞,直到线程池里所有任务结束
        executor.shutdown();
        boolean loop=true;
        do {    //等待所有任务完成
            loop = !executor.awaitTermination(2, TimeUnit.SECONDS);
        } while(loop);
    }
需要注意2点:
1、尽量使用有界队列。
2、如果使用默认的拒绝策略,一定要做好异常处理。
2 怎么获取到任务执行结果
前面我们用的execute()方法没办法返回任务执行结果。需要用刀Future相关:
 @Test
    public void futureTest() throws ExecutionException, InterruptedException {
        // 创建 FutureTask
        FutureTask<User> futureTask = new FutureTask<User>(()-> {
                    return new User(2222,1000);
        });
        // 创建线程池
        ThreadPoolExecutor es=new ThreadPoolExecutor(20,50,3, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(200),new ThreadPoolExecutor.DiscardOldestPolicy());
        // 提交 FutureTask
        es.submit(futureTask);
        // 获取计算结果
        User result = futureTask.get();
        log.info("result:"+result);
    }

image.png
futureTask还可以提供了:取消任务、判断是否执行完成等功能。