8.1在任务与执行策略之间的印象耦合
1.依赖性任务
2.使用线程封闭机制的任务
3.对响应时间敏感的任务
4.使用ThreadLocal的任务
线程池
8.1.1线程饥饿死锁
程序清单8-1 在单线程Executor中任务发生死锁(不要这么做)
/**
* 线程饥饿死锁(Thread Starvation Deadlock)
* RenderPageTask本身还没执行完呢,是不会让出唯一的一个线程给header和footer,
* 因为RenderPageTask依赖于header和footer的执行结果。
*/
public class ThreadDeadLock {
ExecutorService exec = Executors.newSingleThreadExecutor(); //单线程的线程池
public class RenderPageTask implements Callable<String> { //提交Callable
public String call() throws Exception {
Future<String> header,footer;
//RenderPageTask的Callable任务将另外2个任务提交到了同一个Executor中,并且等待这个被提交任务的结果
header = exec.submit(new LoadFileTask("header.html")); //Callable中嵌套的线程启用
footer = exec.submit(new LoadFileTask("footer.html")); //Callable中嵌套的线程启用
String page = renderBody();
//将发生死锁 —— 由于任务在等待子任务的结果
return header.get() + page + footer.get();
}
}
}
8.1.2 运行时间较长的任务
8.1.3 设置线程池的大小
int N_CPU = Runtime.getRuntime().availableProcessors();
计算密集型:N个CPU的处理器的系统上,线程池大小通常为N+1 最优
8.3 配置ThreadPoolExecutor
ThreadPoolExecutor源码解析
@Data //可以通过getter、setter定制线程池
public class ThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue; //getter、setter
private final HashSet<Worker> workers = new HashSet<Worker>();//getter、setter
private final Condition termination = mainLock.newCondition();//getter、setter
private int largestPoolSize; //getter、setter
private long completedTaskCount; //getter、setter
//线程工厂,线程池里的线程都是通过这个工厂创建的
private volatile ThreadFactory threadFactory; //getter、setter
//饱和策略,默认是“中止策略”:AbortPolicy
private volatile RejectedExecutionHandler handler; //getter、setter
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
private volatile long keepAliveTime; //getter、setter
private volatile boolean allowCoreThreadTimeOut; //getter、setter
private volatile int corePoolSize; //getter、setter
private volatile int maximumPoolSize; //getter、setter
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
* 线程池中保持的线程数,即便线程是闲置的,除非allowCoreThreadTimeOut被设置
* 只有在工作队列满了的情况下,才会创建超出这个数量的线程
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* 可同时活动的线程数量上限
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* 如果某个线程的空闲时间超过了keepAliveTime,那么将被标记为可回收的,
* 并且当线程池的大小超过了基本大小时,这个空闲的线程将被终止。
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* 时间单位
* @param unit the time unit for the {@code keepAliveTime} argument
* //阻塞式工作队列
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
/**构造方法1,全自定义*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { }
/**构造方法2 调用了构造方法1,使用默认【线程工厂】,自定义的饱和策略*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);}
/**构造方法3 调用构造方法2,使用默认的【饱和策略】和默认的【线程工厂】*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//8.4 拓展ThreadPoolExecutor
//自定义➕日志如果此处抛出一个异常,那么任务不会被调用,afterExecute也不会被调用
protected void beforeExecute(Thread t, Runnable r) { }
//无论是从run中返回还是抛出一个异常返回,afterExecute都会被调用,(如果带有一个Error则不调用)
protected void afterExecute(Runnable r, Throwable t) { }
//线程池完成关闭操作时调用此方法,也就是在所有任务都已经完成并且所有工作者线程都已经关闭后
//可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发生通知、
//记录日志或收集finalize统计信息等操作
protected void terminated() { }
/**
* 当队列被填满后,新提交的任务无法保存到队列中等待执行时,抛弃策略开始起作用
*/
/**
* 调用者-运行策略,实现了一种调节机制,改策略不会抛弃任务,也不会抛出异常
* 而是将某些任务回退到调用者,从而降低新任务流量。他不会再线程池的某个线程中执行新提交的任务,
* 而是在一个调用了Executor.execute()的线程中执行该任务。
*
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); //没有在线程池中启用线程,run()方法直接调用,在ThreadPoolExecutor的调用者线程中执行
}
}
//
}
//终止饱和策略,抛出异常,让用户自己处理, 默认的
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/** 参数信息同上
* Always throws RejectedExecutionException.
* @throws RejectedExecutionException always
*/
throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}
//抛弃策略,啥也不干,discarding task r.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
/**参数信息同上
* Does nothing, which has the effect of discarding task r.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} //Does noting
}
/**
* 如果工作队列是一个优先队列,“抛弃最旧的策略”将导致抛弃优先级最高的任务,
* 因此不要将优先队列和DiscardOldestPolicy放在一起使用
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/**参数信息同上*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();//抛弃下一个将被执行的任务,
e.execute(r);//尝试在线程池中重新提交新的任务
}
}
}
}
程序清单8-5 线程工程接口
package java.util.concurrent;
public interface ThreadFactory {
Thread newThread(Runnable r);
}
程序清单8-6 自定义线程工厂
import java.util.concurrent.ThreadFactory;
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
return new MyThread(r,poolName);
}
}
//自定义线程
class MyThread extends Thread {
MyThread(Runnable r,String name) { }
}
Executors工具类中定义的静态线程工厂类:
DefaultThreadFactory 和 PrivilegedThreadFactory
Executors.defaultThreadFactory(); //获取默认的线程工厂类实例创建线程
Executors.privilegedThreadFactory() ;//获取优先级的线程工厂类实例创建线程
详见下方的源码解析:
Executors源码解析
public class Executors {
/**
* 默认线程池的有界LinkedBlockingQueue,
* 基本大小(corePoolSize)和最大大小(maxinumPoolSize)同为参数大小
* keepAliveTime=0,代表不会超时,永远都在
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
/**
* 默认线程池的有界LinkedBlockingQueue,
* 基本大小(corePoolSize)和最大大小(maxinumPoolSize)同为参数大小
* keepAliveTime=0,代表不会超时,永远都存活
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
/**
* 本类中自定义的FinalizableDelegatedExecutorService封装ThreadPoolExecutor,
* 返回类型为ExecutorService,只暴露ExecutorService的接口,因此不能对它配置
*
* 基本线程数量和最大线程数量都是1,keepAliveTime=0,线程永远不超时,不会被回收
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
/**
* 本类中自定义的FinalizableDelegatedExecutorService封装ThreadPoolExecutor,
* 返回类型为ExecutorService,只暴露ExecutorService的接口,因此不能对它配置
*
* 基本线程数量和最大线程数量都是1,keepAliveTime=0,线程永远不超时,不会被回收
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory));
}
/**
* 基本大小设置为0,最大大小是最大的合法整数,并将超时时间设置为1分钟。
* 所以这种方法创建出来的线程池可以被无限扩展,并且当需求降低时可以自动收缩。
* @return
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
/**
* 基本大小设置为0,最大大小是最大的合法整数,并将超时时间设置为1分钟。
* 所以这种方法创建出来的线程池可以被无限扩展,并且当需求降低时可以自动收缩。
* @return
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
threadFactory);
}
/**------------带有日程安排的线程池的创建----------*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
/**------------带有日程安排的线程池的创建----------*/
/**获取默认的线程工厂*/
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**获取具有 优先级的线程工厂*/
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
/**-----Callable适配器-----*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) { //传递进来Runnable接口逻辑,在本Callable中调用
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
/**
* A callable that runs under established access control settings
* 按照已有的配置逻辑优先级适配
*/
static final class PrivilegedCallable<T> implements Callable<T> {
private final Callable<T> task;
private final AccessControlContext acc;
PrivilegedCallable(Callable<T> task) { //传递进来的是一个Callable逻辑
this.task = task;
this.acc = AccessController.getContext();
}
public T call() throws Exception {
try {
return AccessController.doPrivileged(
new PrivilegedExceptionAction<T>() {
public T run() throws Exception {
return task.call();
}
}, acc);
} catch (PrivilegedActionException e) {
throw e.getException();
}
}
}
/**
* A callable that runs under established access control settings and
* current ClassLoader
*/
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
//......
}
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix; //线程名称前缀
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);//非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); //设置为默认的优先级别:5
return t;
}
}
/**
* 优先级的线程工厂类实现,继承了默认的线程工厂类
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;
PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);
// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
/**Delegated:英[ˈdelɪɡeɪtɪd] 选派,授权
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
/**
* A wrapper class that exposes only the ScheduledExecutorService
* methods of a ScheduledExecutorService implementation.
*/
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
/** Cannot instantiate. */
private Executors() {}
}
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以 及直接将任务从生产者移交给工作者线程.SynchronousQueue不是一个真正的队列,而是一 种在线程之间进行移交的机制.要将一个元素放入SynchronousQueue中,必须有另一个线程 正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么 ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作 者线程从队列中提取该任务•只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue 才有实际价值•在ncwCachedThreadPool工厂方法中就使用了 SynchronousQueue。
Listing 8.17. Resultbearing Latch Used by ConcurrentPuzzleSolver.
@ThreadSafe
public class ValueLatch<T> {
@GuardedBy("this") private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet() {
return (done.getCount() == 0);
}
public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException {
done.await();
synchronized (this) {
return value;
}
}
}