Java多线程(6)-- 分工之线程池

       使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

  如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

  那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?在Java中可以通过线程池来达到这样的效果。

合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池继承关系图:

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系:

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等,也提供了更加全面的提交任务机制,如返回Future而不是void的submit方法。注意,这个方法的输入是Callable,它解决了Runnable无法返回结果的困扰;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

       Java标准库提供了几种基础实现,比如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。这些线程池的设计特点在于其高度的可调节性和灵活性,以尽量满足复杂多变的实际应用场景。

        Executors则从简化使用的角度,为我们提供了各种方便的静态工厂方法。


java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

参数的含义:

corePoolSize:线程池的基本大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池中允许的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。注意还有一个largestPoolSize,记录了曾经出现的最大线程个数。因为setMaximumPoolSize()可以改变最大线程数。

poolSize:线程池中当前线程的数量。

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

workQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

(1)ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

(2)LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。其实也是有界队列,但是不设置大小时就时Integer.MAX_VALUE。静态工厂方法Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()使用了这个队列。

(3)SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

(4)PriorityBlockingQueue:一个具有优先级得无界阻塞队列

handler(任务拒绝策略):

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务


那么poolSizecorePoolSizemaximumPoolSize三者的关系是如何的呢?

有界队列:

当新提交一个任务时:

(1)如果poolSize < corePoolSize,新增加一个线程处理新的任务。

(2)如果poolSize = corePoolSize,新任务会被放入阻塞队列等待。

(3)如果阻塞队列的容量达到上限,且这时poolSize < maximumPoolSize,新增线程来处理任务。

(4)如果阻塞队列满了,且poolSize = maximumPoolSize,那么线程池已经达到极限,会根据饱和策略RejectedExecutionHandler拒绝新的任务。

(5)如果线程池中的线程数量大于 corePoolSize时,且某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

无界队列

与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

https://blog.csdn.net/kusedexingfu/article/details/72491864


几个工厂方法:

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个线程池,容量大小为         

                                                                    Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //创建容量为1的线程池

Executors.newFixedThreadPool(intnThreads);    //创建固定容量大小的线程池

Executors.newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize)  //创建的是一个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。

Executors.newWorkStealingPool(intparallelism)    //这是一个经常被人忽略的线程池,Java8才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行的处理任务,不保证处理顺序。

实现细节:

public static ExecutorService newFixedThreadPool(int nThreads) {

    return newThreadPoolExecutor(nThreads, nThreads,

                                 0L, TimeUnit.MILLISECONDS,

                                 new LinkedBlockingQueue());

}

public static ExecutorService newSingleThreadExecutor() {

    return newFinalizableDelegatedExecutorService

        (newThreadPoolExecutor(1, 1,

                               0L, TimeUnit.MILLISECONDS,

                               new LinkedBlockingQueue()));

}

public static ExecutorService newCachedThreadPool() {

    return newThreadPoolExecutor(0, Integer.MAX_VALUE,

                                 60L, TimeUnit.SECONDS,

                                 new SynchronousQueue());

}


从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。


向线程池提交任务:

我们可以使用execute提交的任务,但是execute方法没有返回值,所以无法判断任务知否被线程池执行成功。通过以下代码可知execute方法输入的任务是一个Runnable类的实例。

threadsPool.execute(new Runnable() {

    @Override

    public void run() {

    // TODO Auto-generatedmethod stub

    }

});

我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,

get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

try {

    Object s = future.get();

    } catch(InterruptedException e) {

    //处理中断异常

    } catch(ExecutionException e) {

    //处理无法执行任务异常

    } finally {

    //关闭线程池

    executor.shutdown();

}


线程池的关闭:

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。

shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。


线程池的更多细节可参考:https://www.cnblogs.com/dolphin0520/p/3932921.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。