Java 线程池创建、使用、停止

JUC 已经提供了一些现成的线程池给开发者使用,但是这些线程池或多或少不能满足具体的业务开发需求,所以在使用线程池的时候,大多是自己创建。

JUC 提供的线程池

  • newSingleThreadExecutor,创建只有一个线程的线程池。
  • newFixedThreadPool,用来创建固定大小的线程池。
  • newWorkStealingPool,创建具有抢占式操作的线程池。
  • newCachedThreadPool,创建可以自动扩容的线程池,增长没有上限。
  • newScheduledThreadPool,创建有定时任务功能的线程池。

这些线程池要么没有自动扩容的功能,要么自动扩容的大小不能控制,所以不太能够契合具体的业务场景。

创建自定义线程池

public class TestThreadPoolExecutor {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
    }
}

ThreadPoolExecutor 最多需要 7 个参数。方法分别是

  • corePoolSize,核心线程数。线程池新建的时候,是没有核心线程的,只有当任务要处理的时候才会新建线程。当线程空闲,没有任务要处理,这些线程不会被销毁。

  • maximumPoolSize,最大线程数。当核心线程被用完,并且队列满了,会新建线程,最多达到这个数。

  • keepAliveTime,保持存活时间。当线程池空闲,多于核心线程数的线程在这个时间后被销毁。

  • TimeUnit,保持存活时间的时间单位,可以设置为毫秒,秒等。

  • workQueue,任务储存队列,当核心线程数被用完,多余的任务会进入这个队列等待。

    3 种队列:

    • SynchronousQueue 是直接交换,这个队列没有容量,新的任务会直接交给线程池处理。=
    • LinkedBlockingDeque是无界队列,没有容量限制。如果任务生产的速度大于消费速度,可能会造成 OOM。
    • ArrayBlockingQueue 是有界队列,可以设置队列储存任务的数量。
  • threadFactory(可选),当线程池需要新建线程的时候会使用 threadFactory 来新建。这个一般用默认的。

  • RejectedExecutionHandler(可选),当线程池和任务对接都满了的时候,线程池会拒接新的任务,这个参数是设置拒绝策略的。

    4 种拒绝策略:

    • AbortPolicy,抛出异常
    • DiscardPolicy,丢弃任务
    • DiscardOldestPolicy,丢弃队列中最老的任务, 把位置留给新的任务。
    • CallerRunsPolicy,谁提交的任务谁自己运行。

使用线程池

public class TestThreadPoolExecutor {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
        threadPool.submit(new Runnable() {
            @Override
            public void run() {

            }
        });
    }
}

通过使用 submit 方法可以向线程池里提交任务,任务的逻辑写在 Rannable 对象的 run 方法中。

任务太多怎么拒绝

4 种拒绝策略

  • AbortPolicy,抛出异常
  • DiscardPolicy,丢弃任务
  • DiscardOldestPolicy,丢弃队列中最老的任务, 把位置留给新的任务。
  • CallerRunsPolicy,谁提交的任务谁自己运行。

停止线程池

  • shutdown,这个方法会等待存量任务执行完毕,并且不接收新的任务。直到存量任务执行完毕,线程池关闭。
  • isShutdown,这个方法用来判断线程池是否开始停止。不是指线程池完全停止,而是看线程池是否开始停止,不在接收新的任务。
  • isTerminated,这个方法用来判断线程池是否完全停止。
  • awaitTermination,等待一段时间,如果这段时间内线程池的任务执行完毕,返回 true,否则返回 false。
  • shutdownNow,这个方法用来立刻停止线程池,返回从未开始执行的任务的列表。使用需要格外谨慎,如果线程池里的任务是开发者自己编写,需要对中断做额外的处理。也就是任务正确停止,不能做没有后续处理。

示例如下:

public class TestThreadPoolExecutor {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
        threadPool.submit(new Runnable() {
            @Override
            public void run() {

            }
        });

        threadPool.shutdown();
        threadPool.isShutdown();
        threadPool.isTerminated();
        try {
            threadPool.awaitTermination(10,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        List<Runnable> runnables = threadPool.shutdownNow();
    }
}

线程池的钩子方法,在执行之前,之后做一些事

  1. 实现自己的线程池,继承 ThreadPoolExecutor。
  2. 必须要重写默认的四个构造方法。
  3. 重写 beforeExecute 和 afterExecute,在执行之前和执行之后做一些事情。

示例:

public class PauseableTreadpool extends ThreadPoolExecutor {
    public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableTreadpool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }



    public static void main(String[] args) {
        PauseableTreadpool pauseableTreadpool = new PauseableTreadpool(5, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue(100), new ThreadPoolExecutor.DiscardPolicy());
        pauseableTreadpool.submit(new Runnable() {
            @Override
            public void run() {
                
            }
        })
        
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。