目录:
Executor 框架简介
1.Executor
2.ExecutorService
3.Executors
4.Executor、ExecutorService、Executors对比
5.ThreadPoolExecutor参数详解
Executor 框架简介
在 Java 5 之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor 框架便是 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在 Java 5之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用 Executor 在构造器中。
Executor 框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。
因为创建和管理线程非常心累,并且操作系统通常对线程数有限制,所以建议使用线程池来并发执行任务,而不是每次请求进来时创建一个线程。使用线程池不仅可以提高应用的响应时间,还可以避免"java.lang.OutOfMemoryError: unable to create new native thread" 之类的错误。
1.Executor
Executor 是一个抽象层面的核心接口(大致代码如下)。
public interface Executor {
void execute(Runnable command);
}
2.ExecutorService
ExecutorService 接口 对 Executor 接口进行了扩展,提供了返回 Future 对象,终止,关闭线程池等方法。当调用 shutDown 方法时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。
Future 对象提供了异步执行,这意味着无需等待任务执行的完成,只要提交需要执行的任务,然后在需要时检查 Future 是否已经有了结果,如果任务已经执行完成,就可以通过 Future.get() 方法获得执行结果。需要注意的是,Future.get() 方法是一个阻塞式的方法,如果调用时任务还没有完成,会等待直到任务执行结束。
通过 ExecutorService.submit() 方法返回的 Future 对象,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。
部分代码如下:
public interface ExecutorService extends Executor {
void shutdown();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}
3.Executors
Executors 是一个工具类,类似于 Collections。提供工厂方法来创建不同类型的线程池,,分别为:
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by lbc19 on 2018/6/4.
*/
public class ExecutorsPractice {
public static void main(String[] args) {
/**
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
*/
/**
1)newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
*/
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("a");
}
});
}
/**
2)newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
因为线程池大小为3,每个任务输出index后sleep 10秒,所以每10秒打印3个数字。
定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
*/
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("a");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
3)newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
*/
//表示延迟3秒执行,只执行一次
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("a" + new Date());
}
}, 3, TimeUnit.SECONDS);
//表示延迟1秒后每3秒执行一次(一直执行)
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("a" + new Date());
}
}, 1, 3, TimeUnit.SECONDS);
//第一次延迟1s,之后都延迟3s(一直执行)
scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("a" + new Date());
}
}, 1, 3, TimeUnit.SECONDS);
/**
4)newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
*/
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("a" + new Date());
}
});
}
}
}
部分源代码如下:发现他们用的都是ThreadPoolExecutor
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
4.Executor、ExecutorService、Executors对比
Executor | ExecutorService | Executors |
---|---|---|
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务; | ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口,提供了异步执行和关闭线程池的方法; | Executors 是一个工具类,类似于 Collections。提供工厂方法来创建不同类型的线程池,比如 FixedThreadPool 或 CachedThreadPool。 |
Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象 | ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象 | |
Executor 中的 execute() 方法不返回任何结果 | ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。1.通过 **Future.get() **方法获得执行结果,Future.get() 方法是一个阻塞式的方法; | |
不能取消任务 | Future 提供了 cancel() 方法用来取消执行 pending 中的任务; | |
没有提供和关闭线程池有关的方法 | ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown()方法终止线程池。 |
5.ThreadPoolExecutor参数详解
- corePoolSize:线程池核心线程数大小
- maximumPoolSize:线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务
- keepAliveTime:线程池中超过corePoolSize数目的空闲线程最大存活时间,allowCoreThreadTimeOut为
true的核心线程有效时间 - unit:时间单位
- workQueue:阻塞任务队列,用于保存任务以及为工作线程提供待执行的任务
- threadFactory:线程工厂,线程生成器
- handler:当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理;
当一个新的任务提交到线程池之后,线程池是如何处理的:
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略:
- CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
- AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
- DiscardPolicy:直接抛弃任务,不做任何处理
- DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交
创建完线程池后,可通过submit或execute方法提交任务。
如果使用 Executors 的工厂方法创建的线程池,那么饱和策略都是采用默认的 AbortPolicy。所以如果我们想当线程池已满的情况,使用调用者的线程来运行任务,就要自己创建线程池,指定想要的饱和策略,而不是使用 Executors 了。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
感谢网友的分享:
http://wiki.jikexueyuan.com/project/java-concurrency/executor.html
https://blog.csdn.net/solrChina/article/details/78296253
https://www.jianshu.com/p/d621da64fae0?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation