- Executor接口
- ExecutorService规定了Executor的生命周期 (待写)
- ThreadPoolExecutor是ExecutorService的实现类
线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。-
对于延迟任务、周期任务使用ScheduledThreadPoolExecutor来代替Timer。
- Timer使用绝对时间而不是相对时间,在执行定时任务时只会创建一个线程。如果某个任务执行时间过长,会破坏其它TimerTask的定时精确性。
- Timer中的任何一个TimerTask抛出一个未检查异常,就会取消整个Timer,尚未被调度的TimerTask将不会被执行。
- ScheduledThreadPoolExecutor配合DelayQueue来代替Timer。
ThreadPoolExecutor构造函数
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解释
corePoolSize:线程池基本大小,即没有任务执行时的大小,但是在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads
maximumPoolSize:线程池最大大小。表示可同时活动的线程数量的上限。
keepAliveTime:线程存活时间,如果某个线程的空闲时间超出了存活时间,那么被标记为可回收,但是,只有线程池的当前大小超过了基本大小这个线程才会被终止。
unit:时间单位,和keepAliveTime组成具体时间,比如keepAliveTime为10,unit= TimeUnit.SECONDS,就表示存活时间为10秒。
-
workQueue:工作队列,用于存放提交的任务,用于被线程执行。
- BlockingQueue的其中4个实现类:SynchronousQueue(同步队列),ArrayBlockingQueue(数组阻塞队列FIFO),LinkedBlockingQueue(链表阻塞队列FIFO),PriorityBlockingQueue(优先级阻塞队列)。后2个可以是有界也可以是无界,第二个是有界队列,第一个是好比容量只有一的阻塞队列。
- SynchronousQueue不是一个真正队列,而是一种在线程之间进行移交的机制,要将一个元素放入SynchronousQueue中(put方法),必须有另一个线程正在等待接受这个元素(take方法),如果没有线程在等待,那么就会阻塞。在线程池中如果当前线程池中的线程数量没有达到最大值,就创建新的线程,否则就根据饱和策略(handler)来执行。该队列适合消费者多的情况,针对线程池就是适合无界的线程池(maximumPoolSize为int最大值)
threadFactory:构造新线程的工厂,具体看:处理非正常的线程终止中使用ThreadFactory来定制Thread。
-
handler:饱和策略,当有界工作队列被填满后,并且没有可用的线程(线程池也是有界的),饱和策略发挥作用。如果某个任务被提交到一个已被关闭的Executor时(调用了shutdown方法),也会用到饱和策略。
- 四种策略。中止(Abort):该策略抛出未检查异常RejectedExecutionException,调用者可以捕获这个异常,然后根据需求处理;抛弃(Discard):抛弃任务;抛弃最旧的(Discard-Oldest):抛弃下一个将被执行的任务,然后重提提交新的任务,如果工作队列是优先队列(优先级)那么将抛弃优先级最高的任务;调用者运行(Caller-Runs):将任务转交给由调用execute的线程执行该任务,即主线程。
-
注:如果线程数量等于线程池基本大小值corePoolSize,那么只有当工作队列被填满后,ThreadPoolExecutor才会创建新线程来执行队列中的任务
- corePoolSize为0、maximumPoolSize为3,并且工作队列为有界队列,并且大小为5,那么只有每当5个任务都填满队列后,才会创建一个线程执行,最多只有3个线程同时作业。Executors.newCachedThreadPool()返回的ThreadPoolExecutor的corePoolSize为0,maximumPoolSize为MAX_VALUE,workQueue为SynchronousQueue。
-
下实现一种平缓的性能下降。
案例
- 1.创建固定大小线程池,有界队列,调用者运行饱和策略
- 没有定义饱和策略,但是通过使用Semaphore(信号量)来控制任务的提交速率,信号量大小设置为线程池大小加上可排队列任务的数量,从而控制正在执行的和等待执行的任务数量。
package net.jcip.examples;
import java.util.concurrent.*;
import net.jcip.annotations.*;
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
扩展ThreadPoolExecutor
-
ThreadPoolExecutor提供了beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
- 其中beforeExecute、afterExecute在每个任务线程中执行如下,runWorker会在在任务线程的run方法中执行。
public runWorker(Worker w){
......
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
....
}
- afterExecute在beforeExecute抛出RuntimeException,或者任务完成后带有一个Error都不会被调用。
- terminated在线程池关闭时调用,可以用来释放Executor分配的资源,或者执行发送通知,记录日志或手机finalize统计信息等操作。
案例
- 给线程池添加统计信息
- 测量任务的运行时间,记录已处理任务数和总的处理时间,并通过terminated来输入包含平均任务时间的日志消息。
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
递归算法的并行化
- 如果在循环体中包含一些密集型计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行化。(计算密集型和I/O操作对Cpu利用率不同,所以线程池大小也不一样)
- 下面给出来了2个串行转并行的操作。
public abstract class TransformingSequential {
//1.将串行转并行
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
//2.将串行递归转化为并行递归:树的深度优先遍历
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
//等待通过并行方式的计算结果
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}