1.介绍
这篇文章将介绍一下java中的线程池-一开始我们先看一下java标准库中的实现,之后再看一下Google的Guava库中的实现。
2.线程池
在java中,线程会被映射到系统级别的线程也就是操作系统的资源。如果你不可控地创建线程,那么你就有可能很快地耗尽这些资源。线程间的上下文切换也是由操作系统来完成的-为了模拟并发。一个过于简单化的观点是:你生产的线程越多,每一个线程在处理实际工作时,花费的时间越少。
多线程模式可以帮助我们在多线程程序中有效地节省资源并且在一定的限制下确保并发性。 在你使用线程池时,你会以并行任务的形式编写你的并发代码并且把他们提交给一个线程池实例运行。这个实例控制着多个可复用的线程。
多线程模式允许你控制应用程序所创建的线程数量,它们的生命周期,以及调度这些任务的执行并且把进来的线程保存到一个队列中。
3.java中线程池
3.1 Executors, Executor 以及ExecutorService
Executor 类为你包含了多个用于创建预配置的线程池实例的方法,使用它你无需进行任何自定义的调整。Executor和ExecutorService接口是用于和不同线程池实现一起工作的。通常,你应该把你的代码和线程池的实现相隔离,并且在你的应用程序中使用这些接口。
Executor接口有一个单独的execute()方法,这个方法可以用于提交那些需要运行的Runnable实例。
这里有一个简单的例子,讲述了如何使用Executor API来获取一个由单个线程池支持的Executor实例 以及一个用于次序地执行任务的无界队列。这里,我们执行了一个单任务,这个单任务就是简单地在屏幕上打印"Hello World",该任务会作为一个lambda表达式被提交给ExecutorService。
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));
ExecutorService 接口包含了大量用于控制任务处理以及管理服务结束的方法。使用这个接口,你可以提交要执行的任务并且通过使用返回的Future实例 ,你可以控制这些任务的执行。在下面的例子中,我们创建了一个ExecutorService,提交了一个任务,之后使用其返回的Future对象的get()方法会导致等待直到提交的任务完成并且有返回值返回:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();
当然,在现实生活中的场景下,通常你并不想立即调用future.get()方法,而是推迟到在你真正需要这个返回值的时候再调用它。
另外,submit方法也有两种重载形式,要么接收一个Runnable接口或者接收一个Callable接口,这俩个接口都是很实用的接口,它们都能作为lambda表达式被传递过去。
Runnable接口的单一方法不会抛出异常也不会返回值。而Callable接口可能就更加方便了,因为它允许抛出异常并且返回值。最后-为了能让编译器 推断出这是一个Callable 类型,只需简单地从lambda表达式中返回一个值即可。
更多使用ExecutorService接口和future的例子,请查看http://www.baeldung.com/java-executor-service-tutorial。
3.2 ThreadPoolExecutor
ThreadPoolExecutor是一个可扩展的线程池实现,它提供了许多参数和用于调整的钩子(with lots of parameters and hooks for fine-tuning)
我们接下来主要讨论的配置参数是: corePoolSize, maximumPoolSize 以及keepAliveTime。
线程池会由一定固定数量的核心线程组成,这些线程会一直保存在线程池中。其他额外的线程会在需要时被生产出来,并且在不需要时被终结掉。该corePoolSize参数就是核心线程的数量,这些线程会在实例化并且一直保存在线程池中。如果所有的核心线程都繁忙,同时又有更多的任务被提交上来,那么,该线程池就会扩大到maximumPoolSize数量。keepAliveTime参数(保活时间)是一个时间间隔,它用于控制那些额外创建的线程在空闲状态下能够存活时间间隔。
这些参数覆盖了广泛的用例,但是 大多数典型的配置都被预定义在Executor的静态方法中。
例如,newFixedThreadPool方法会创建一个ThreadPoolExecutor,该线程池的corePoolSize和maximumPoolSize参数值是相等的,并且keepAliveTime为0,这也就意味着:该线程池中的线程数量总是固定不变的:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
assertEquals(2, executor.getPoolSize());
assertEquals(1, executor.getQueue().size());
在上面的例子中,我们用2个固定数量的线程实例化了一个ThreadPoolExecutor,这意味着,如果同时运行的任务数小于等于2的话,它们能立马被运行。否则的话,它们中的一些任务会被放入到一个队列中等待运行(put into a queue to wait for their turn)。
我们创建了三个Callable任务,并且让它们睡眠1000毫秒从而模拟一个繁重的任务。头俩个任务会被立即执行,但是第三个将不得不在队列中等待,我们可以在提交这些任务之后,调用getPoolSize()和getQueueSize()方法来验证。
另一个预配置的ThreadPoolExecutor可以使用Executor.newCachedThreadPool()方法来创建,该方法不会接收线程数量,corePoolSize被设置为0,并且maximumPoolSize被设置为Integer.MAX_VALUE,keepAliveTime被设置为60s。
这些参数值意味着:该CachedThreadPool线程池可以无限地增长以容纳提交任意数量的任务。但是当那些线程不再需要时,如果它们在60s内不活跃的话,就会被处理掉。
一个典型的用例(use case)就是 当在你的应用中存在许多短期存活的任务时,CachedThreadPool线程池将会是最佳的选择。
ThreadPoolExecutor executor =
(ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
executor.submit(() -> {
Thread.sleep(1000);
returnnull;
});
assertEquals(3, executor.getPoolSize());
assertEquals(0, executor.getQueue().size());
上面例子中的队列大小总会是0,因为它在内部使用的是一个SynchronousQueue实例,在Synchronous实例中,成对的insert和remove操作总是同时发生,因此,这个队列中不会包含任何东西。
Executors.newSingleThreadExecutor() 会创建一个典型的只包含单个线程的ThreadPoolExecutor,这个单线程的executor对于创建事件线程来说是非常理想的。
它的corePoolSize以及maximumPoolSize参数大小都等于1,并且keepAliveTime为0。上面案例中的任务会被顺序地执行,因此,在这个任务结束之后,这个标识的值将会是2.
AtomicInteger counter = newAtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
counter.set(1);
});
executor.submit(() -> {
counter.compareAndSet(1, 2);
});
另外,这个ThreadPoolExecutor用了一个互斥的包装所修饰,因此,在它创建之后,就不能重新配置了。注意:这也就是为什么我们不能把它转成一个ThreadPoolExecutor的原因。
3.3 ScheduledThreadPooolExecutor
ScheduledThreadPooolExecutor 继承了ThreadPoolExecutor类,同时实现了ScheduledExecutorService 接口的好几个方法:
.schedule方法允许在一个指定的延迟之后,再执行一个任务。
.scheduleAtFixedRate方法则允许在一个指定的初始延迟之后再执行任务,并且之后再一定的周期内循环执行该任务。所以这个执行频率是固定的fixed。
.scheduleWithFixedDelay 方法类似于scheduleAtFixedRate,因为, 它会重复执行给定的任务,但是给定的延迟指的是:上个任务执行结束和下个任务开始执行之间的间隔;执行频率因执行一个给定任务所花费的时间的不同而不同。
该Executors.newScheduledThreadPool() 方法主要是用于创建一个调度线程池执行器ScheduledThreadPoolExecutor,该ScheduledThreadPoolExecutor带有给定corePoolSize大小,无限的maximumPoolSize以及keepAliveTime的值为0.这里有一个如何在500毫秒之后调度一个任务执行的案例:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);
下面的代码展示了如何在500毫秒的延迟之后,执行一个任务,并且之后每隔100毫秒重复执行一次。在调度完该任务之后,我们会一直等待直到它用CountDownLatch锁倒计时三次。然后,使用Future.cancel()方法取消它:
CountDownLatch lock = newCountDownLatch(3);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture future = executor.scheduleAtFixedRate(() -> {
System.out.println("Hello World");
lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);
3.4 ForkJoinPool
ForkJoinPool是java7中fork/join框架的核心部分,它解决 在递归算法中创建多任务的常见问题。使用一个简单的ThreadPoolExecutor,你将会把线程迅速地耗完,因为每一个任务或子任务都需要一个它自己的线程来运行。
在fork/join框架中,任一个任务都能创建一定数量的子任务,并且使用join方法等待他们完成。 fork/join框架的好处是:它不会为每一个任务或子任务创建一个新的线程而是去实现Work Stealing 算法。该框架在《带你进入java中的Fork/join框架》文章中进行了全面的介绍。
我们来看一个简单的例子,使用ForkJoinPool遍历一个树节点并且计算所有叶子值得总和。这里是一个树的简单实现,它由一个节点,一个int值,以及一组子节点构成。
static class TreeNode { int value; Set children;
TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}
}
现在如果我们想并发地计算树上的所有值,我们需要实现RecursiveTask接口,每一个任务接收它自己的节点并且把它的值和它的所有孩子值的总和相加。
要计算孩子值得总和,该任务实现需要做如下事情:
.游历子节点集合
.在游历过程中做映射,为每一个元素新建一个CountingTask
.fork一个CountingTask来执行每一个子任务。
.在每一个forked任务上调用join方法来收集处理结果。
.使用Collectors.summingInt 集合来总计结果。
public static class CountingTask extends RecursiveTask {
private final TreeNode node;
public CountingTask(TreeNode node) {
this.node = node;
}
@Override
protected Integer compute() {
return node.value + node.children.stream()
.map(childNode -> new CountingTask(childNode).fork())
.collect(Collectors.summingInt(ForkJoinTask::join));
}
}
在一个实际的树上运行该计算的代码非常简单:
reeNode tree = new TreeNode(5,
new TreeNode(3), new TreeNode(2,
new TreeNode(2), new TreeNode(8)));
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
int sum = forkJoinPool.invoke(new CountingTask(tree));
4.Guava中的线程池实现
Guava是一个流行的Google库。它有许多实用的并发类,包括 ExecutorService的几个handy实现。这些实现都不能直接实例化或继承,因此,创建它们实例的唯一入口点就是:MoreExecutor类。
4.1 添加Guava的maven依赖
把下面的maven依赖添加到你的pom文件中,这样你的项目中就包含了Guava库啦。
4.2 DirectExecutor和DirectExecutorService
有时候,你想在当前线程或者一个线程池中执行任务,取决于一些条件。你可能更倾向于使用单一的Executor接口,然后仅仅切换一下实现。虽然 实现一个Executor或ExecutorService来让当前线程执行任务并不难,但它仍然要我们写一些模板化的代码。
幸运地是,Guava为我们提供了预定义的实例。
下面有一个例子,这个例子表明了在同一个线程中执行任务。虽然该任务睡眠了500毫秒,它会阻塞当前线程 但当执行完成之后,结果便是立即可用的。
Executor executor = MoreExecutors.directExecutor();
AtomicBoolean executed = newAtomicBoolean();
executor.execute(() -> {
try{
Thread.sleep(500);
} catch(InterruptedException e) {
e.printStackTrace();
}
executed.set(true);
});
assertTrue(executed.get());
directExecutor()方法返回的实际是一个静态单例对象,所以,使用这个方法在对象创建时不会造成任何花销。
相比较于 MoreExecutors.newDirectExecutorService(),你可能更喜欢这个方法,因为newDirectExecutorService,会在每一次调用时,都创建一个完整的对象。
4.3 退出Executor Service
另一个常见的问题是,当线程池还在执行它的任务时,你把虚拟机关闭了。即使有一个恰当的取消机制,也无法保证任务可以完美地表现,并且在executor service关闭时,能够停止它们的工作。这有可能导致在任务的执行过程中,JVM不定期地挂起。
为了解决这个问题,Guava 引入了一系列退出executor service的机制。它们都是基于守护线程来实现的,这些守护线程是伴随着JVM一起终止的。这些service使用Runtime.getRuntime().addShutdownHook()方法增加了一个关闭钩子。这样,在放弃挂起任务之前,就可以在一定时间内防止VM虚拟机终止(terminate)。
在下面的例子中,我们提交了一个包含无限循环的任务,但是我们使用了一个现存的executor service,这个executor service会在VM结束时,等待100毫秒。如果没有这个exitingExecutorService 的话,这个任务可能会导致虚拟机无限期地挂起。
ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(5);
ExecutorService executorService =MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS);
executorService.submit(() -> {
while(true) {
}
});
4.4 Listening Decorators
Listening Decorators允许你包装ExecutorService并且在任务提交时,接收一个ListenableFuture实例而不是一个简单得Future实例。该ListenableFuture接口继承了Future接口,并且有一个额外的方法addListener.这个方法允许添加一个监听器,这个监听器会在future完成之后被调用。
你很少想直接使用ListenableFuture.addListener() ,但是对于 大多数Future 功能类来说,它确实必须的。例如,借助于Futures.allAsList()方法你可以在单个ListenableFuture中组合多个ListenableFuture实例:
ExecutorService executorService = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello");
ListenableFuture future2 = listeningExecutorService.submit(() -> "World");
String greeting = Futures.allAsList(future1, future2).get()
.stream()
.collect(Collectors.joining(" "));
assertEquals("Hello World", greeting);
5.总结
在本篇文章中,我们已经讨论了线程池模式和它在java标准库以及Google的Guava库中的实现。本篇文章的源代码都在github上:代码地址