一、newFixedThreadPool/newCachedThreadPool/newSingleThreadExecutor
JDK Executors提供的基本线程池:
/**
* Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}.
* * @param nThreads the number of threads in the pool
* @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0}
*/public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed. At any point, * at most {@code nThreads} threads will be active processing * tasks. If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available. If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks. The threads in the pool will * exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}. * * @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool * @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors.
* * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * * @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool * @throws NullPointerException if threadFactory is null
*/public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
/**
* Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() {
return newSingleThreadExecutor(defaultThreadFactory());
}
/**
* Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent {@code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new threads
* @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null
*/public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new AutoShutdownDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
java线程池的工作流程是:核心线程 -> 工作队列 -> 工作线程 -> 拒绝策略
1. newFixedThreadPool和newSingleThreadExecutor使用无界队列,所以流程只会走到工作队列,无法创建工作线程,无法执行拒绝策略
2. newCachedThreadPool则是没有核心线程,结合SynchronousQueue要求每个put对应一个take,所以会不断创建工作线程,由于没有最大线程限制,所以同样无法执行拒绝策略
二、newScheduledThreadPool
JDK的ScheduledThreadPool在执行调度任务时实际是靠DelayedWorkQueue的take()方法实现的时间控制:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take()方法会阻塞线程最近的task执行所需等待的时间,这里的frist是最近要执行的ScheduledFutureTask:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
...
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
...
}
有意思的地方是调度任务提交的ScheduledThreadPool之后立马就被核心线程取出了,然后线程就进入了阻塞状态,一直阻塞到延迟时间结束才开始执行任务。
JDK Executors中提供的newScheduledThreadPool:
/**
* Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newScheduledThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created scheduled executor */public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. (Note * however that if this single thread terminates due to a failure * during execution prior to shutdown, a new one will take its * place if needed to execute subsequent tasks.) Tasks are * guaranteed to execute sequentially, and no more than one task * will be active at any given time. Unlike the otherwise * equivalent {@code newScheduledThreadPool(1, threadFactory)} * the returned executor is guaranteed not to be reconfigurable to * use additional threads. * * @param threadFactory the factory to use when creating new threads
* @return the newly created scheduled executor * @throws NullPointerException if threadFactory is null
*/public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
/**
* Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool,
* even if they are idle * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0}
*/public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool,
* even if they are idle * @param threadFactory the factory to use when the executor
* creates a new thread * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
都是依靠核心线程执行的线程池,缺点与newFixedThreadPool类似,同样是无界工作队列(DelayedWorkQueue基于堆的优先级无界队列),Task会不断增长,不会创建工作线程,不会触发拒绝策略执行。
三、newWorkStealingPool
workStealingPool利用ForkJoinPool,原理跟传统线程池很不同。任意ForkJoinTaskThread获取到任务开始执行后任务会被传播到整个ForkJoinPool。