要使任务和线程安全、快速、可靠的停止下来并不是一件容易的事情。java没有提供任何机制来安全的终止线程。但它提供了中断(Interruption),这是一种协作机制能够使一个线程终止另一个线程的当前工作。还可以借助容器来实现线程的终止。
1.任务取消
1.1通过volatile类型的域来保存取消状态
一般run()方法执行完,线程就会正常结束,然而,常常有些线程是伺服线程。它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程。使用一个变量来控制循环,例如:最直接的方法就是设一个boolean类型的标志,并通过设置这个标志为true或false来控制while循环是否退出,代码示例:
一个仅运行一秒的素数生成器
* 线程的取消与关闭 通过cancel方法将设置标志 并且主循环会检车这个标志,
* 为了使这个过程可靠的工作 标志cancelled必须为 volatile
package com.cocurrent.demo;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* @Author: nanJunYu
* @Description: 一个仅运行一秒的素数生成器
* 线程的取消与关闭 通过cancel方法将设置标志 并且主循环会检车这个标志,
* 为了使这个过程可靠的工作 标志cancelled必须为 volatile
* @Date: Create in 2018/9/11 15:21
*/
public class CancelVolatile implements Runnable {
private final List<BigInteger> pri = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
pri.add(p);
}
}
}
private void cancel() {
cancelled = true;
}
private synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(pri);
}
public static void main(String[] args) {
CancelVolatile cancelVolatile = new CancelVolatile();
new Thread(cancelVolatile).start();
try {
SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
cancelVolatile.cancel();
}
System.out.println(cancelVolatile.get());
}
}
CancelVolatile中的取消机制最终会使得搜索的素数任务退出,但是退出过程中需要花费一定的时间,然而,如果任务调度了一些阻塞方法,(BlockingQueue.put)那么可能产生一个问题——任务可能永远不会检查取消标示,永远不会结束。为取保线程能退出,我们通常使用中断,当我们调用interrupt,并不意味着立即停止目标线程正在运行的线程,而只是传递了一个请求中断的信息,它会在线程下一个合适的的时刻中断自己。wait、sleep、join、将严格处理这种请求,当他们收到一个中断请求,或饿着开始执行时发现中断状态时,将抛出异常。
使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态,如果返回true,除非你你想屏蔽这个中断,否则必须对它进行处理,抛出异常或者再次调用interrupt来恢复中断状态。
try {
...
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
通常,我们用中断来取消任务比检查标记更好,是最合适的取消任务方式,我们看一个更加健壮的获得素数的类。
public class PrimeProducer extends Thread{
private final BlockingQueue<BigInteger> queue;
public PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while(!Thread.currentThread().isInterrupted()) //①用线程的状态来检查
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException e) {
//中断将线程退出
}
}
public void cancel() { interrupt(); }
}
1.2通过 future 的cancel取消线程
package com.cocurrent.demo;
import java.util.concurrent.*;
/**
* @Author: nanJunYu
* @Description:通过 future 的cancel取消线程
* @Date: Create in 2018/9/12 16:41
*/
public class FutureCancel {
public static void timeRun(Runnable runnable, Long timeout, TimeUnit timeUnit) throws InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(runnable);
try {
future.get(timeout,timeUnit);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
future.cancel(true);
}
}
}
1.3使用interrupt()方法中断当前线程
在程序中,我们是不能随便中断一个线程的,因为这是极其不安全的操作,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断导致数据不一致混乱的问题。正因此,JAVA里将Thread的stop方法设置为过时,以禁止大家使用。
一个线程什么时候可以退出呢?当然只有线程自己才能知道。
所以我们这里要说的Thread的interrrupt方法,本质不是用来中断一个线程。是将线程设置一个中断状态。
当我们调用线程的interrupt方法,它有两个作用:
1、如果此线程处于阻塞状态(比如调用了wait方法,io等待),则会立马退出阻塞,并抛出InterruptedException异常,线程就可以通过捕获InterruptedException来做一定的处理,然后让线程退出。
2、如果此线程正处于运行之中,则线程不受任何影响,继续运行,仅仅是线程的中断标记被设置为true。所以线程要在适当的位置通过调用isInterrupted方法来查看自己是否被中断,并做退出操作。
package com.cocurrent.demo;
/**
* @Author: nanJunYu
* @Description:线程的中断方法测试
* @Date: Create in 2018/9/11 17:16
*/
public class InterruptedDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Work());
thread.start();
Thread.sleep(1000);
thread.interrupt();
System.out.println("Main thread stopped.");
}
public static class Work implements Runnable {
public void run() {
System.out.println("我在做一些事情。。。");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread curr = Thread.currentThread();
//再次调用interrupt方法中断自己,将中断状态设置为“中断”
curr.interrupt();
System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
System.out.println("Static Call: " + Thread.interrupted());//clear status
System.out.println("---------After Interrupt Status Cleared----------");
System.out.println("Static Call: " + Thread.interrupted());
System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
}
System.out.println("Worker stopped.");
}
}
}
1.4使用shutdown()和shutdownNow()
1.4.1 shutdown()
将线程池状态置为SHUTDOWN,并不会立即停止:
停止接收外部submit的任务
内部正在跑的任务和队列里等待的任务,会执行完
等到第二步完成后,才真正停止
shutdown方法源码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
1.4.2 shutdownNow()
将线程池状态置为STOP。企图立即停止,事实上不一定:
跟shutdown()一样,先停止接收外部提交的任务
忽略队列里等待的任务
尝试将正在跑的任务interrupt中断
返回未执行的任务列表
shutdownNow源码
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
从源码可以很清晰的看出两者的区别,shutdown使用了以后会置状态为SHUTDOWN,而shutdownNow为STOP。此外,shutdownNow会返回任务列表。
1.5使用stop方法终止线程
程序中可以直接使用thread.stop()来强行终止线程,但是stop方法是很危险的,就象突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果,不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出ThreadDeatherror的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用stop方法来终止线程
2.停止基于线程的服务
2.1关闭ExecutorService
ExecutorService提供两个关闭的方法,shutdown 和shutdownNow。shutdownNow首先关闭当前执行线程,然后返回未执行的任务,而shutdown则等待执行完成。两种方法差别在于安全性和响应性。
直接看例子了,这没什么好说的。
package com.cocurrent.demo;
import java.util.concurrent.*;
/**
* @Author: nanJunYu
* @Description:
* @Date: Create in 2018/9/12 17:42
*/
public class ExecutorServiceCancelB {
public static class MyCallable implements Callable {
private int flag = 0;
public MyCallable(int flag) {
this.flag = flag;
}
public String call() throws Exception {
if (this.flag == 0) {
return "flag = 0";
}
if (this.flag == 1) {
try {
while (true) {
System.out.println("looping.");
Thread.sleep(2000);
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
return "false";
} else {
throw new Exception("Bad flag value!");
}
}
}
public static void main(String[] args) {
// 定义3个Callable类型的任务
MyCallable task1 = new MyCallable(0);
MyCallable task2 = new MyCallable(1);
MyCallable task3 = new MyCallable(2);
// 创建一个执行任务的服务
ExecutorService es = Executors.newFixedThreadPool(3);
try {
// 提交并执行任务,任务启动时返回了一个Future对象,
// 如果想得到任务执行的结果或者是异常可对这个Future对象进行操作
Future future1 = es.submit(task1);
// 获得第一个任务的结果,如果调用get方法,当前线程会等待任务执行完毕后才往下执行
System.out.println("task1: " + future1.get());
Future future2 = es.submit(task2);
shutdownAndAwaitTermination(es);
// 等待5秒后,再停止第二个任务。因为第二个任务进行的是无限循环
Thread.sleep(5000);
System.out.println("task2 cancel: " + future2.cancel(true));
// 获取第三个任务的输出,因为执行第三个任务会引起异常
// 所以下面的语句将引起异常的抛出
Future future3 = es.submit(task3);
System.out.println("task3: " + future3.get());
} catch (Exception e) {
System.out.println(e.toString());
}
// 停止任务执行服务
//es.shutdown();
}
private static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(10, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
2.2毒丸对象
另一种关闭生产者-消费者服务的方式就是使用“毒丸”对象,其实就是指往对象里面放一个标志对象,当得到这个对象就立即停止,这就需要在执行方法里面判断,消费者读到毒丸后就不会再执行,同样生产者提交毒丸后,就不能再提交任务。
只有生产者和消费者都已知的情况下,才可以使用“毒丸”,当生产者和消费者和数量较大时,方法变的难以使用。
直接看书中的例子,这个不是很常用,粘过来,以备用吧。
public class IndexingService {
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
class CrawlerThread extends Thread {
class IndexerThread extends Thread {
public void start() {
producer.start();
consumer.start();
}
/* Listing 7.18 */
} /* Listing 7.19 */
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
3.3只执行一次的服务
如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期,其中该Executor的生命周期由这个方法来控制。
下面程序checkMail方法能在多台主机上并行地检查新邮件,它创建一个私有的Executor,并向每台主机提交一个任务,当所有邮件任务都执行完成后,关闭并结束。
boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts) {
exec.execute(new Runnable() {
@Override
public void run() {
if (check(host))
hasNewMail.set(true);
}
});
}
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
3.JVM关闭钩子
3.1关闭钩子
关闭钩子本质上是一个线程(也称为Hook线程),用来监听JVM的关闭。通过使用Runtime的addShutdownHook(Thread hook)可以向JVM注册一个关闭钩子。Hook线程在JVM 正常关闭才会执行,在强制关闭时不会执行。
对于一个JVM中注册的多个关闭钩子它们将会并发执行,所以JVM并不能保证它的执行顺行。当所有的Hook线程执行完毕后,如果此时runFinalizersOnExit为true,那么JVM将先运行终结器,然后停止。Hook线程会延迟JVM的关闭时间,这就要求在编写钩子过程中必须要尽可能的减少Hook线程的执行时间。另外由于多个钩子是并发执行的,那么很可能因为代码不当导致出现竞态条件或死锁等问题,为了避免该问题,强烈建议在一个钩子中执行一系列操作。
明白了其原理之后,也需要知道其使用场景:
1、内存管理
在某些情况下,我们需要根据当前内存的使用情况,人为的调用System.gc()来尝试回收堆内存中失效的对象。此时就可以用到Runtime中的totalMemory()、freeMemory()等方法。示例如下:
/**
* @Author: nanJunYu
* @Description:1、内存管理 在某些情况下,我们需要根据当前内存的使用情况,人为的调用System.gc()来尝试回收堆内存中失效的对象。
* 此时就可以用到Runtime中的totalMemory()、freeMemory()等方法。示例如下:
* @Date: Create in 2018/9/12 20:48
* @params:
* @return:
*/
public static void autoClean() {
Runtime runtime = Runtime.getRuntime();
if ((runtime.totalMemory() - runtime.freeMemory()) / (double) runtime.maxMemory() > 0.90) {
System.out.println("执行清理工作");
} else {
System.out.println(runtime.freeMemory());
}
}
2、执行命令
class Test {
public static void main(String args[]){
Runtime r = Runtime.getRuntime();
Process p = null;
try{
p = r.exec("notepad");
} catch (Exception e) {
}
}
注意:通过exec()方式执行命令时,该命令在单独的进程(Process)中。
3、通过Hook实现临时文件清理
实例:
/**
* @Author: nanJunYu
* @Description:通过Hook实现临时文件清理
* @Date: Create in 2018/9/12 20:54
* @params:
* @return:
*/
public void clearTemporaryFile() {
Runtime runtime = Runtime.getRuntime();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
System.out.println("auto clean temporary file");
}
}));
}
4. spring bean DisposableBean 跟随bean对象的生命周期销毁。
这里说的了DisposableBean做一下引申和 spring bean InitializingBean。
对于bean实现了InitalizingBean接口,重写了afterPropertiesSet(),此方法运行在所有的bean被实例化之后。
DisaposableBean接口,重写destroy方法,此方法是执行在bean释放之后,也就是spring容器销毁之后。
这里介绍的是一个redis缓存的示例,比如在spring容器启动后,需要把省市区数据加载到缓存,并且定期对数据进行更新,这个时候我们就可以结合InitalizingBean和DisaposableBean,还有定时器ScheduledThreadPoolExecutor、多线程一起来实现。
首先,spring容器启动后,加载实例化所有bean对象后,就会执行afterPropertiesSet(),创建线程,线程中取得数据,缓存到redis,设置ScheduledThreadPoolExecutor,并设置好定时器的时间间隔。然后在afterPropertiesSet启动线程run方法,执行定时器即可。
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import com.ycst.craft4j.system.util.DateUtil;
import net.sf.ehcache.util.NamedThreadFactory;
@Component
public class BasicDataCacheLoader implements InitializingBean,DisposableBean{
private ScheduledThreadPoolExecutor executor = null;
protected static final int CORE_SIZE = Runtime.getRuntime().availableProcessors() * 2;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public void destroy() throws Exception {
executor.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
executor = new ScheduledThreadPoolExecutor(CORE_SIZE, new NamedThreadFactory("static-info-loader"));
RefreshCache refreshCache = new RefreshCache();
refreshCache.run();
executor.scheduleWithFixedDelay(refreshCache, 10, 10, TimeUnit.SECONDS);
}
private class RefreshCache implements Runnable{
@Override
public void run() {
//获取省市区数据,这里随意写一些无关逻辑
redisTemplate.opsForValue().set("time", DateUtil.getDateTime());
}
}
public String getCache(){
return redisTemplate.opsForValue().get("time");
}
}
5.容器销毁时关闭
shutdown.sh后,无法关掉tomcat进程。
ps -ef | grep tomcat
返回tomcat进程仍然存在。经过调查发现是因为在Web应用中启动了线程池,shutdown只会关闭web线程(默认监听端口8080),关闭线程(默认监听8005);对于通过线程池(包括Executors创建的ExecutorService),只能是手动清除。使用tomcat容器的好处就在于能够包住应用的生命周期,比如解决这个问题就可以通过实现ServletContextListener里面的contextDestroyed方法,来进行手动删除。
多线程(线程池)的关闭是几个阶段的,RUNNING->SHUTDOWN->STOPT->IDYING->TERMINATED,线程池对象提供的isShutdown()对应的就是SHUTDOWN阶段,isTerminated()对应的就是TERMINATED阶段,前者只要调用了shutdown,返回就是true,后者只有所有的线程池中的任务结束了(中断处理并退出)才会返回true。
这意味着,所有的多线程runnable/callable的实现,如果是while(true)模式的,要对于中断异常尽心捕获,并且进行退出。
while(true){
try{
... ...
catch (InterruptedException e) {
logger.warn("ResponseQueueHandler接收到中断请求,退出程序");
return ;
}catch (Throwable e) {
... ...
}
}
很多时候我们处理异常,都会捕获并且记录下来完事,避免某个处理影响后续处理;但是对于中断异常就要退出处理,只有关闭才会向一个线程发起中断请求(如果有其他实现场景也要触发中断另说),所以只要中断,就退出。
中断处理的机制,基于我们上面的讨论,如下:
public static void shutdownThread(ExecutorService service, String serviceName) {
service.shutdown();
try {
if (!service.isTerminated()) {
logger.debug("直接关闭失败{}", serviceName);
service.awaitTermination(3, TimeUnit.SECONDS);
if (service.isTerminated()) {
logger.debug("成功关闭{}", serviceName);
} else {
logger.debug("{}关闭失败,直接shutdownNow!", serviceName);
if (service.shutdownNow().size() > 0) {
logger.debug("最终{}没有关闭成功", serviceName);
} else {
logger.debug("shutdownNow终于成功关闭{}", serviceName);
}
}
} else {
logger.debug("直接成功关闭{}", serviceName);
}
} catch (InterruptedException e) {
logger.warn("接收到中断请求,{}停止操作", serviceName);
}
}
如果是故障恢复机制的需求,可以在关闭后指定多长时间内再次启动线程。如果多次重启线程失败,做出必要的提醒或者更长时间后进行重启。具体可以针对实际需求设定。
我的应用在实现的时候,FixedScheduleThread,还是newCachableThread都是单独来搞的,现在回想一下,其实应该让他们都来自于同一个ExecutorService,这样,在contextDestroyed里面其实只要关闭一个executorService即可,现在是要关闭一堆。
对于线程池的关闭(上面的代码是共通关闭线程的方式),需要额外添加一段处理:
for (Future<String> future : this.runTaskMap.values()) {
future.cancel(true);
}
将线程池中多有得Future都取消,这个过程其实就是中断线程的过程,逐个线程中断后,再对ExecutorService进行shutdown()操作即可。
kill -9 tomcat_PID将会暴力关闭应该用,kill tomcat_PID将会优雅关闭线程,和调用线程池的shutdown效果一样。
最后,其实可以有一种更加简单粗暴的方式,那就是在contextDestroyed里面调用exit(0)。刚开始使用的时候,担心会不会把整台机器的JVM都干掉。但是如果搞懂了JVM的原理,其实每一个运行的JAVA程序都有自己的JVM,并不会互相干扰。另外,如果你想要在exit(0)之前做些什么处理,可以为JVM添加钩子,来进行诸如关闭线程的处理,详细前面3中已经介绍。