一,参数
1,corePoolSize:核心线程数量,线程池可以保留的活跃线程的数量。
2,maximumPoolSize:线程池能运行的最大线程数。
3,keepAliveTime:线程数量大于核心线程数时,没有任务执行的线程能存活的时间。
4,TimeUnit :上面参数的时间单位。
5,BlockingQueue<Runnable> :任务队列,存储 Runnable类型的任务。当线程数量等于核心线程数量,而且所有线程正在执行任务时,新接收的任务就会存储在此队列中。
6,ThreadFactory :线程池创建新线程时用到的工厂类。
7,RejectedExecutionHandler:拒绝策略,当任务队列满了,而且线程数量达到最大线程数时。新到达的线程的处理策略。有忽略策略,调用线程池线程执行策略,淘汰任务队列最早任务策略,拒绝并报错策略等。
二,存储结构
1,任务队列,BlockingQueue<Runnable>, Runnable类型的任务存储在队列中。
2,线程池,HashSet<Worker>,worker.Thread()可以获取工作线程,线程池是一个hashset。
三,运行过程
1,创建一个线程池,在还没有任务提交的时候,默认线程池里面是没有线程的。当然,你也可以调用prestartCoreThread方法,来预先创建一个核心线程。
2,线程池里还没有线程或者线程池里存活的线程数小于核心线程数corePoolSize时,这时对于一个新提交的任务,线程池会创建一个线程去处理提交的任务。当线程池里面存活的线程数小于等于核心线程数corePoolSize时,线程池里面的线程会一直存活着,就算空闲时间超过了keepAliveTime,线程也不会被销毁,而是一直阻塞在那里一直等待任务队列的任务来执行。
3,当线程池里面存活的线程数已经等于corePoolSize了,这是对于一个新提交的任务,会被放进任务队列workQueue排队等待执行。而之前创建的线程并不会被销毁,而是不断的去拿阻塞队列里面的任务,当任务队列为空时,线程会阻塞,直到有任务被放进任务队列,线程拿到任务后继续执行,执行完了过后会继续去拿任务。这也是为什么线程池队列要是用阻塞队列。
4,当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列也满了,这里假设maximumPoolSize>corePoolSize(如果等于的话,就直接拒绝了),这时如果再来新的任务,线程池就会继续创建新的线程来处理新的任务,知道线程数达到maximumPoolSize,就不会再创建了。这些新创建的线程执行完了当前任务过后(在finnally代码块中执行完成任务后的操作,去任务队列拿任务),在任务队列里面还有任务的时候也不会销毁,而是去任务队列拿任务出来执行。在当前线程数大于corePoolSize过后,线程执行完当前任务,会有一个判断当前线程是否需要销毁的逻辑:如果能从任务队列中拿到任务,那么继续执行,如果拿任务时阻塞(说明队列中没有任务),那超过keepAliveTime时间就直接返回null并且销毁当前线程,直到线程池里面的线程数等于corePoolSize之后才不会进行线程销毁。
5,如果当前的线程数达到了maximumPoolSize,并且任务队列也满了,这种情况下还有新的任务过来,那就直接采用拒绝的处理器进行处理。默认的处理器逻辑是抛出一个RejectedExecutionException异常。你也就可以指定其他的处理器,或者自定义一个拒绝处理器来实现拒绝逻辑的处理(比如讲这些任务存储起来)。JDK提供了四种拒绝策略处理类:AbortPolicy(抛出一个异常,默认的),DiscardPolicy(直接丢弃任务),DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池),CallerRunsPolicy(交给线程池调用所在的线程进行处理)。
四,主要接口方法
1. Executor接口
主要是用来执行提交的任务。下面是接口定义:
public interface Executor {
void execute(Runnable command);
}
后面说的线程池会实现这个接口,并且会使用这个方法来提交一个任务。
2. ExecutorService接口
ExecutorService接口是Executor接口的一个子接口,它在Executor接口的基础上增加了一些方法,用来支持对任务的终止管理以及对异步任务的支持。
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
3. AbstractExecutorService 抽象类
AbstractExecutorService实现了ExecutorService,并基于模板方法模式对一些方法给出了实现。是我们接下来要提到的线程池类ThreadPoolExecutor的直接父类。代码贴出来有点多,这里就不贴了。
4. ThreadPoolExecutor类
ThreadPoolExecutor通常就是我们所说的线程池类,Java的线程池就是用过这个类进行创建的。下面分析的线程池的运行原理,也是基于这个类来进行分析的。
5. ScheduledExecutorService接口
ScheduledExecutorService接口是ExecutorService子接口,定义了线程池基于任务调度的一些方法。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
可以看到,上面定义了延时周期调度,固定频率周期调度,返回任务结果的任务调度等方法。
6. ScheduledThreadPoolExecutor类
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,并且实现了ScheduledExecutorService接口,对任务调度的功能进行了实现。
7. Executors类
Executors可以认为是线程池的工厂类,里面提供了静态方法对线程池进行创建。
下面列出常用的几种线程池创建方法:
//固定线程大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//单个线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//无上限线程线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//基于任务调度的线程池(还有其他类型的任务调度线程池,这里不列举了)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
以上就是Java线程池相关的API类。