Executor
执行已提交的Runnable任务的对象接口。这个接口提供了一种将任务提交和任务运行机制(包括线程使用、调度等的详细信息)分离的方法。
public interface Executor {
/**
* 在将来的某些时候执行给定任务。根据Executor的实现判定是在一个新线程还是线程池还是调用线
* 程中执行。
*/
void execute(Runnable command);
}
通常用Executor来代替显示的创建Thread。如要在单独的线程执行某项任务时,使用
public void test() {
Executor executor = new ThreadPerTaskExecutor();
executor.execute(new RunnableTask1());
}
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
而不是
new Thread(new RunnableTask()).start()
很多Executor的实现都对任务的执行方式和时间强加的限制,如下面定义了一个顺序执行任务的Executor:
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.add(() -> {
try {
r.run();
} finally {
scheduleNext();
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
java.util.concurrent包中提供的Executor的实现ExecutorService是一个更广泛的被使用的接口。ThreadPoolExecutor提供一个可扩展的线程池实现。Executors类为这些Executor提供了方便使用的工厂方法。
ExecutorService
- 一个Executor;
- 提供shutdown()方法终止Executor的执行;
- 提供submit()方法返回一个Future来追踪异步任务的执行进度;
- 提供invokeAll()方法返回一个Future列表来追踪多个异步任务的执行进度。
如果我们现在创建了一个异步执行的网络任务:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (; ; ) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) {
this.socket = socket;
}
public void run() {
// read and service request on socket
}
}
可以调用shutdown()方法终止它的执行:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
ScheduledExecutorService
- 一个ExecutorService;
- 其scheduleXXX()方法创建并执行ScheduledFuture;
- 该ScheduledFuture可以在给定的延迟后变为启动状态,并开始执行任务;
- 该ScheduledFuture可以定期执行任务;
- 通过这个ScheduledFuture可以get任务执行结果,可以cancel这个任务。
如需要创建一个在一小时内每隔10s就响一次的报警器:
class BeeperControl {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = () -> System.out.println("beep");
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(() -> {
beeperHandle.cancel(true);
}, 60 * 60, SECONDS);
}
}
AbstractExecutorService
- 一个ExecutorService;
- 其submit、invokeAny、invokeAll方法在执行任务前,把任务封装成RunnableFuture,方便跟踪执行进度。
/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ExecutorService的实现
ThreadPoolExecutor
- 一个AbstractExecutorService;
- 使用线程池中的某个线程执行提交的任务,该线程池通常由Executors中的工厂方法创建;
- FIFO;
- 其execute的设计思想为:
if(threadCount < 池中需要在空闲状态也要保持的线程数){
new thread;
把当前任务做为新建线程的第一个任务执行;
} else if(任务放入等待队列){
在队列中等待在运行的线程将它取出并执行;
} else if(threadCount < 池中最多可拥有的线程数){ //此时意味着等待队列已满,放入失败
new thread;
把当前任务做为新建线程的第一个任务执行;
} else {
拒绝该任务的执行,如抛出异常;
}
-
线程池中维持corePoolSize数量的线程数,如果有线程在线程池shutdown之前因为执行失败而中断,则在执行后续任务时会新建一个替代它的线程。
具体原理如下:
各个ctl之间的关系如下:
所以shutdown()代表停止接收新任务但仍需执行现有任务,shutdownNow()代表停止接收新任务也不再执行现有任务。
ThreadPoolExecutor在实现时用到了BlockQueue和AbstracQueuedSynchronizer。
可以看到Lock和BlockQueue是非常实用的。
Lock替代了synchronized,功能更强大使用也更简便,如下:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
BlockingQueue非常适用于生产-消费者模型,如下:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while (true) {
consume(queue.take());
}
} catch (InterruptedException ex) { ...handle ...}
}
void consume(Object x) { ...}
}
class BlockingQueueTest {
void main() {
BlockingQueue q = new SomeQueueImplementation();
LockTest.Producer p = new LockTest.Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
ScheduledThreadPoolExecutor
- 一个可以执行延迟任务和周期性任务的ThreadPoolExecutor;
- 如果提交的任务在运行前被取消,则该任务会在延迟结束后被取消;
- 如果多个任务有相同的执行时间,采用FIFO;
因为功能不同,其实现和ThreadPoolExecutor也不一样:
- maximumPoolSize == corePoolSize;且等待任务队列大小没有限制为Integer.MAX_VALUE;
- 设置了shutdown策略来决定哪些任务在线程池进入SHUTDOWN状态后不需要再执行;
- 采用DelayedWorkQueue而非LinkedBlockingQueue,加快取消速度(时间复杂度从O(n)降至O(log n));
- 在插入队列时将所有任务封装成ScheduledFutureTask给DelayedWorkQueue使用;
ForkJoinPool
- 继承AbstractExecutorService;
- 将所有提交的或执行的任务封装成ForkJoinTask执行;
- 需要将任务分解成足够小的子任务执行,思路如下:
解决(问题):
if 问题足够小:
直接解决问题 (顺序算法)
else:
for 部份 in 细分(问题)
fork 子任务来解决(部份)
join 在前面的循环中生成的所有子任务
return 合并的结果
- 每个Worker都有其自己的本地任务队列;
-
每个Worker都可以去其它Worker的任务队列中窃取任务执行;
- workQueues保存了WorkQueue列表,其中奇数索引代表内部任务(unshared queues,本地任务列表),偶数索引代表外部任务(shared queues,窃取的任务列表),因此workQueues的大小始终为2的幂次方。
其具体实现原理如下:
Executors
提供java.util.concurrent包中的Executor、ExecutorService、ScheduledExecutorService、ThreadFactory的工厂方法。
newFixedThreadPool()
创建一个指定corePoolSize和maximumPoolSize都为nThreads的ThreadPoolExecutor。
适用于负载较重的并行运算。
newWorkStealingPool()
创建ForkJoinPool,指定处理还没joined的forked任务的规则为FIFO(上面介绍中的ForkJoinPool默认采用LIFO)。
适用于“分而治之”递归运算计算密集的运算。
newScheduledThreadPool()
创建一个指定corePoolSize的ScheduledThreadPoolExecutor。
适用于需要定期或周期性执行的运算。
newSingleThreadExecutor()
创建一个仅仅只能拥有一个执行线程的ThreadPoolExecutor。
适用于串行运算。
newCachedThreadPool()
创建一个线程数量不限的ThreadPoolExecutor(corePoolSize=0, maximumPoolSize=Integer.MAX_VALUE)。
适用于负载较轻的数量繁多的短期运算。
newSingleThreadScheduledExecutor()
创建一个仅仅只能拥有一个执行线程的ScheduledThreadPoolExecutor。
适用于串行的需要定期或周期性执行的运算。
MoreExecutors
提供java.util.concurrent包中的Executor、ExecutorService、ThreadFactory的工厂方法。
newSequentialExecutor(Executor delegate)
delegate:实际执行任务的底层Executor
创建一个实际由delegate执行任务的SequentialExecutor。
SequentialExecutor:由delegate委拖任务执行程序实际执行任务的采用FIFO规则串行执行任务的Executor。
适用于需要按FIFO规则执行的串行运算。
listeningDecorator(ExecutorService delegate)
delegate:实际执行任务的底层Executor
创建一个实际由delegate执行任务的ListeningExecutorService。
ListeningExecutorService:可以在任务完成后再执行给定任务的ExecutorService。
适用于需要将运算链接在一起的场景。
应用
创建一个在非UI线程执行并行任务的Executor
@Provides
@Annotations.NonUiParallel
@Singleton
static ExecutorService provideNonUiThreadPool() {
return Executors.newFixedThreadPool(
5,
runnable -> {
Log.i("DialerExecutorModule.newThread", "creating low priority thread");
Thread thread = new Thread(runnable, "DialerExecutors-LowPriority");
// Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
thread.setPriority(4);
return thread;
});
}
创建一个在非UI线程执行串行任务的Executor
static ScheduledExecutorService provideNonUiSerialExecutorService() {
return Executors.newSingleThreadScheduledExecutor(
runnable -> {
Log.i("NonUiTaskBuilder.newThread", "creating serial thread");
Thread thread = new Thread(runnable, "DialerExecutors-LowPriority-Serial");
// Java thread priority 4 corresponds to Process.THREAD_PRIORITY_BACKGROUND (10)
thread.setPriority(4);
return thread;
});
}
创建一个在UI线程执行延迟或周期性任务的Executor
static ScheduledExecutorService provideUiSerialExecutorService() {
return Executors.newSingleThreadScheduledExecutor(
runnable -> {
Log.i("DialerExecutorModule.newThread", "creating serial thread");
Thread thread = new Thread(runnable, "DialerExecutors-HighPriority-Serial");
// Java thread priority 5 corresponds to Process.THREAD_PRIORITY_DEFAULT (0)
thread.setPriority(5);
return thread;
});
}
创建一个在UI线程执行的可以注册listener的轻量级Executor
@Provides
@Annotations.UiParallel
@Singleton
static ExecutorService provideUiThreadPool() {
return (ExecutorService) AsyncTask.THREAD_POOL_EXECUTOR;
}
static ListeningExecutorService provideLightweightExecutor(@Annotations.UiParallel ExecutorService delegate) {
return MoreExecutors.listeningDecorator(delegate);
}
创建一个在非UI线程异步执行运算的可以注册listener的Executore
static ListeningExecutorService provideBackgroundExecutor(
@Annotations.NonUiParallel ExecutorService delegate) {
return MoreExecutors.listeningDecorator(delegate);
}
总结
Executor框架把任务的提交和执行解耦,可以简化并发编程。线程能过线程的复用,可以降低资源消耗,提高响应速度等。