Executors

一、newFixedThreadPool/newCachedThreadPool/newSingleThreadExecutor

JDK Executors提供的基本线程池:

/**  
 * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.  At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.  The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}.  
 * * @param nThreads the number of threads in the pool  
 * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0}  
 */public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}

/**  
 * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.  At any point, * at most {@code nThreads} threads will be active processing * tasks.  If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.  If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.  The threads in the pool will * exist until it is explicitly {@link ExecutorService#shutdown  
 * shutdown}. * * @param nThreads the number of threads in the pool  
 * @param threadFactory the factory to use when creating new threads  
 * @return the newly created thread pool * @throws NullPointerException if threadFactory is null  
 * @throws IllegalArgumentException if {@code nThreads <= 0}  
 */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>(),  
                                  threadFactory);  
}

/**  
 * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.  These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors.  
 * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>());  
}  
  
/**  
 * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * * @param threadFactory the factory to use when creating new threads  
 * @return the newly created thread pool * @throws NullPointerException if threadFactory is null  
 */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {  
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                  60L, TimeUnit.SECONDS,  
                                  new SynchronousQueue<Runnable>(),  
                                  threadFactory);  
}

/**  
 * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)  Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() {  
    return newSingleThreadExecutor(defaultThreadFactory());  
}  
  
/**  
 * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent {@code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new threads  
 * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null  
 */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {  
    return new AutoShutdownDelegatedExecutorService  
        (new ThreadPoolExecutor(1, 1,  
                                0L, TimeUnit.MILLISECONDS,  
                                new LinkedBlockingQueue<Runnable>(),  
                                threadFactory));  
}


java线程池的工作流程是:核心线程 -> 工作队列 -> 工作线程 -> 拒绝策略

1. newFixedThreadPool和newSingleThreadExecutor使用无界队列,所以流程只会走到工作队列,无法创建工作线程,无法执行拒绝策略
2. newCachedThreadPool则是没有核心线程,结合SynchronousQueue要求每个put对应一个take,所以会不断创建工作线程,由于没有最大线程限制,所以同样无法执行拒绝策略

二、newScheduledThreadPool

JDK的ScheduledThreadPool在执行调度任务时实际是靠DelayedWorkQueue的take()方法实现的时间控制:

public RunnableScheduledFuture<?> take() throws InterruptedException {  
    final ReentrantLock lock = this.lock;  
    lock.lockInterruptibly();  
    try {  
        for (;;) {  
            RunnableScheduledFuture<?> first = queue[0];  
            if (first == null)  
                available.await();  
            else {  
                long delay = first.getDelay(NANOSECONDS);  
                if (delay <= 0L)  
                    return finishPoll(first);  
                first = null; // don't retain ref while waiting  
                if (leader != null)  
                    available.await();  
                else {  
                    Thread thisThread = Thread.currentThread();  
                    leader = thisThread;  
                    try {  
                        available.awaitNanos(delay);  
                    } finally {  
                        if (leader == thisThread)  
                            leader = null;  
                    }  
                }  
            }  
        }  
    } finally {  
        if (leader == null && queue[0] != null)  
            available.signal();  
        lock.unlock();  
    }  
}

take()方法会阻塞线程最近的task执行所需等待的时间,这里的frist是最近要执行的ScheduledFutureTask:

private class ScheduledFutureTask<V>  
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
        ...
        public long getDelay(TimeUnit unit) {  
            return unit.convert(time - System.nanoTime(), NANOSECONDS);  
        }
        ...
}

有意思的地方是调度任务提交的ScheduledThreadPool之后立马就被核心线程取出了,然后线程就进入了阻塞状态,一直阻塞到延迟时间结束才开始执行任务。

JDK Executors中提供的newScheduledThreadPool:

/**  
 * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)  Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newScheduledThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created scheduled executor */public static ScheduledExecutorService newSingleThreadScheduledExecutor() {  
    return new DelegatedScheduledExecutorService  
        (new ScheduledThreadPoolExecutor(1));  
}  
  
/**  
 * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically.  (Note * however that if this single thread terminates due to a failure * during execution prior to shutdown, a new one will take its * place if needed to execute subsequent tasks.)  Tasks are * guaranteed to execute sequentially, and no more than one task * will be active at any given time. Unlike the otherwise * equivalent {@code newScheduledThreadPool(1, threadFactory)} * the returned executor is guaranteed not to be reconfigurable to * use additional threads. * * @param threadFactory the factory to use when creating new threads  
 * @return the newly created scheduled executor * @throws NullPointerException if threadFactory is null  
 */public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {  
    return new DelegatedScheduledExecutorService  
        (new ScheduledThreadPoolExecutor(1, threadFactory));  
}  
  
/**  
 * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool,  
 * even if they are idle * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0}  
 */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {  
    return new ScheduledThreadPoolExecutor(corePoolSize);  
}  
  
/**  
 * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool,  
 * even if they are idle * @param threadFactory the factory to use when the executor  
 * creates a new thread * @return the newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0}  
 * @throws NullPointerException if threadFactory is null  
 */public static ScheduledExecutorService newScheduledThreadPool(  
        int corePoolSize, ThreadFactory threadFactory) {  
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);  
}

都是依靠核心线程执行的线程池,缺点与newFixedThreadPool类似,同样是无界工作队列(DelayedWorkQueue基于堆的优先级无界队列),Task会不断增长,不会创建工作线程,不会触发拒绝策略执行。

三、newWorkStealingPool

workStealingPool利用ForkJoinPool,原理跟传统线程池很不同。任意ForkJoinTaskThread获取到任务开始执行后任务会被传播到整个ForkJoinPool。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容