实际开发中需要开启异步线程时,我们都会使用ThreadPoolExecutor类。但是我们一般不会直接通过Executors来随便定义出一个线程池,而是抽象一个utils单例的保存线程池对象供项目使用,根据业务不同,可能有多个。
private static ThreadPoolExecutor exec = new ThreadPoolExecutor(10, 30,
300, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
这是一个线程池的配置,各个参数含义如下:
1、corePoolSize, 核心线程数,建议和cpu的核心数差不多,当有任务提交,检测当前线程池内的线程数小于corePoolSize的话,新建线程执行任务,而不会开始复用,会新建,直到达到corePoolSize。线程池内的线程数大于等于corePoolSize时,将任务放入workQueue等待。
2、maximumPoolSize,允许线程池内最大线程数,当队列满了之后,如果线程池内的线程数小于maximumPoolSize新建线程,如果大于等于执行拒绝策略。
如果maximumPoolSize是30,corePoolSize是10,当队列满了后只能再开20个线程。
3、keepAliveTime,线程池空闲时,线程存活的时间。
4、workQueue,上面提到的线程数超过corePoolSize存放任务的地方。
5、threadFactory,线程工厂,可以自己重写一下,为每个线程赋予一个名字,便于排查问题。
6、handler,拒绝策略,分4种,AbortPolicy直接抛出异常、DiscardPolicy悄悄抛弃不执行、CallerRunsPolicy(调用者运行):该策略既不会抛弃任务也不会抛出异常,而是将这个任务退回给调用者,从而降低新任务的流量;、DiscardOldestPolicy(抛弃最旧的)
调用时我们会封装个方法调用到exec.execute方法
public void execute(Runnable task) {
exec.execute(task);
}
提交后大致的流程:
1、通过调用execute方法,调用ThreadPoolExecutor的addWork方法,Work类是对需要执行的task的封装,task保存在Work的firstTask变量中。execute方法中会做出将任务放在那里的判断,如果进队列,会将任务入队,并且addWord的task参数为null。
2、addWork方法中,会初始化一个Work对象,构造方法中将任务传入,赋值给firstTask变量,并从ThreadFactory中新建一个线程参数是this赋值给Work的thread变量。
3、然后后面调用Work中的thread变量的start方法,这样调用到了Work的run方法。
4、work的run方法调用runWorker方法传入this参数,方法内拿出work的firstTask赋值给task变量,如果为null,从队列里面拿,然后调用task的run方法执行任务。
注意到在runwork中,task的run方法之前需要获得锁,为了防止线程池shutdown等操作中断任务,所以加锁。并且是不可重入锁。Work类通过继承AbstractQueuedSynchronizer实现的不可重入锁。(可重入锁的意思是在同一线程,再次获取相同锁资源不用获取直接用,主要目的防止死锁,synchronize和ReentrantLock都是可重入锁。)
这里使用不可重入锁的目的是防止beforeExecute等自定义方法调用setCorePoolSize方法进一步调用到interruptIdleWorkers里面的trylock获取锁成功导致自己将自己中断
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
t.interrupt();会将自己中断掉。