对于一个话题,他不是读一本书,而是读五本。 -- Mike Slade To Bill Gates
昂贵的资源
我们都知道线程是一种昂贵的计算机资源。与普通的对象相比,线程占用了额外的栈空间,并且在启动与销毁的时候也会产生调度开销。另一方面来讲,我们设置线程的数量常常是要考虑处理器的数量,线程执行的任务特性等诸多方面。单以线程执行任务特性来讲,根据Amdahl‘s定律。我们能最优化设置线程数量的规则如下:
- CPU密集型:这类任务主要消耗的是处理器的资源,为了避免处理器资源的浪费,我们设置线程数量为 CPU数量+1。
- I/O密集型:这种情况下,能用1个线程完成任务是最恰当的,因为多线程会引发额外的系统开销。当一个线程不能胜任的时候,因为等待I/O返回结果的时候是不占用处理器资源的,所以我们最理想的方式是 CPU数量×2。
使用线程池是一种合理应用线程的方式去优化资源。
线程池的原理
与数据库连接池等对象池不同的是,线程池本身作为一个对象,它在内部创建好了一批线程,等待任务对象提交给线程池去执行。我们可以把线程池比作一个公司,这一批线程就是它的员工,有新的订单过来就会暂时放在一个箱子里(Job Queue),当有的员工空闲的时候就会排队去箱子里拿订单去按照要求生产。如上图所示,线程池本质就是一个生产者消费者模型,客户端相当于生产者,线程池内部的缓存队列相当于传输通道,线程池中的线程相当于消费者。
在Java的世界中,java.util.concurrent.ThreadPoolExecutor
类就是一个线程池,我们可以调用submit()
方法提交任务。在Java中,能表示一个线程的只有Thread
类,除此之外的任何类都不能表示线程,Callable
以及Runnable
表示的是一个Task。以下给出了线程池的sumit
方法描述。它的含义就是向线程池提交任务。
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
线程池中的参数
我们从之下的构造器中可以清楚地看到如果你想手动定义一个线程池都需要配备哪些参数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize
核心线程数怎么理解呢?接着之上的比喻来说,核心线程数就相当于一个公司的干部储备,这批人自公司建立起就一直呆在公司了。也就是说核心线程数是自线程池接收到任务之前就已经初始化好了的。当然,在创建线程池的时候,核心线程并不会被创建,而是等待任务都提交之后才会启动创建,除非先前调用了prestartAllCoreThread
。 -
maximumPoolSize
最大线程数指的是线程池允许的最大线程的数量。 -
keepAliveTime
存活时间,指的是除了核心线程之外的线程在没有任务执行的时候,所能存活的时间,超过这个时间的线程就会被停止掉。 -
unit
时间单元 -
workQueue
指的是任务存放的队列,当任务数量过多超过了最大线程数,此时的任务会暂存在工作队列,等待线程执行完再来队列中拿任务。 -
threadFactory
线程工厂,生产线程的工厂,当任务超过核心线程数,少于最大线程数,线程的创建工作由线程工厂来完成。 -
handler
当任务数量过多,以至于工作队列存放满了,之后再被submit的任务会被拒绝接受,并抛出一个异常。但是注意,由submit提交被拒绝的任务不会中断线程池。由execute
提交的任务再不做特殊处理的情况下,如果任务被拒绝,会中断线程池。
说完了使用构造器来创建线程池之后,我们可以看看使用Executors
类中的工具包能创建的线程池,我觉得大多数初学者为了方便都会去使用一下的方法去创建线程池。
拿其中一个方法来举例
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
你会不幸的发现,他们并没有什么魔法,实现方式还是创建了ThreadPoolExecutor
对象,你可能会想,这样创建线程不是挺好的么?封装了方法,很好的展示了面向对象的思想。但事实并非如此。
如果你安装了阿里巴巴的p3c
插件你会注意到一件事情。
他会提醒你手动去创建一个线程池会更好,为什么这么说呢?他用以下例证来讲明了自己的观点。
图片的文字可能有点小,总之,在多线程这个反直觉的编程模式下,我们的一切行为都要尽可能的保证可控,不带应用场景的滥用线程池设计就是在耍流氓...我们可以依照本文开头所提交到的规则去创建线程池。
线程池的饱和处理
在《Java并发编程实战》中,这里说的异常处理被成为饱和策略
,在PoolThreadExecutor
中有如下子类,他们表示被拒绝任务的处理方式。
实现类 | 饱和策略 |
---|---|
AbortPolicy | 直接抛出异常 |
DiscardOldestPolicy | 丢弃workQueue中的旧任务,接纳新任务 |
CallerRunsPolicy | 在客户端线程中执行被丢弃的任务 |
DiscardPolicy | 丢弃被拒绝的任务 |
除此之外,我们也可以使用以下setRejectedExecutionHandler
方法去设置饱和策略。
RejectedExecutionHandler rejectedExecutionHandler = (r, executor) ->
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
JDK中默认的Handler为AbortPolicy
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
线程池的监控
在ThreadPoolExecutor
类中,还支持很多用于线程池监控的方法。
我们可以调用这些方法,来确保线程池时刻处于健康的状态。此外线程池还支持
void beforeExecute(Thread t, Runnable r)
和void afterExecute(Runnable r, Throwable t)
两个钩子方法。也可以用作监控。
总结
还有一些线程池的问题,比方说,关于线程池死锁我们应该知道,同一个线程池只能执行相互独立的任务,有依赖的任务需要不同的线程池去执行。
另一方面,从小习惯角度上来讲,在对线程池异常进行处理的时候,最好能捕获所有异常,并且包装一个RuningTimeException
抛出出来。
相对于并发执行的任务,线程池为我们提供了一种优化且方便的执行手段。我们应当明白,框架工具再强大,也不能替我们完成所有事情,根据特定的任务制定特定的策略才是正确的打开方式。