为什么要中断
- Java中没有一种安全的抢占方法来停止线程,也没有安全的抢占方式停止任务,只有一些协作机制。
- 更好的支持任务的取消关闭
可以取消的任务
- 下面任务名称:在规定时间内搜索素数。
- 任务特点:任务最终会结束。
- PrimeGenerator持续地枚举素数,直到被取消。cancel放在finally中确保cancel最终会被调用。
- PrimeGenerator使用了一种简单的取消策略:客户端代码通过cancel来请求取消,PrimeGenerator在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
@GuardedBy("this") private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator); //缺点:如果generator抛出未受检异常,该异常在此处无法扑捉到
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
永远不会结束的任务
- 下面任务名称:生产者-消费者模式,生产者生产素数,消费者消费素数,使用阻塞队列。
- 任务特点:存在无法结束情况。
- 同样通过cancelled来作为取消标志,但是run任务中存在阻塞方法put,如果生产者速度超过消费者速度,队列被填满,put方法阻塞,当put方法阻塞时,如果消费者希望取消生产者任务,那么消费者可以调用cancel来设置cancelled标志,但是此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来。
- 说明了自定义的取消机制无法与可阻塞的库函数实现良好的交互。
- 解决方法:使用中断而不是boolean标志来请求取消。(见下面代码)
public class BrokenPrimeProducer extends Thread{
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled)
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
}
}
public void cancel() {
cancelled = true;
}
void consumePrimes(){
BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>(3);
BrokenPrimeProducer primeProducer = new BrokenPrimeProducer(primes);
primeProducer.start(); //缺点:如果generator抛出未受检异常,该异常在此处无法扑捉到
try{
while(needMorePrimes()){
consume(primes.take());
}
}finally {
primeProducer.cancel();;
}
}
}
Thread中的中断方法
每个线程都有一个boolean类型的中断状态,当中断线程时,这个线程的中断状态将被设置为true。线程可以查询中断状态根据需求来做响应处理。
interrupt :中断目标线程
isInterrupted:返回目标线程的中断状态
interrupted (静态方法):清除当前线程的中断状态,并返回它之前的值,这是清除中断状态的唯一方法。
中断不会真正中断一个正在运行的线程,只是发出中断请求,然后由线程在下一个合适的时刻中断自己。
阻塞方法如何处理中断
- 阻塞库方法,如Thread.sleep、Object.wait,阻塞队列的take、put方法等等,都会检查当前线程中断状态(即执行该代码的线程),并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。
- 注:并不是所有的阻塞方法都会通过提前返回或抛出异常来响应中断请求(如Socket I/O)
- JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还非常快的。
如何处理阻塞方法抛出的InterruptedException异常
- 1.传递InterruptedException:只需把InterruptedException传递给调用方法的调用者。传递包括:根本不捕获该异常,通过throws抛。或者捕获异常,然后在执行某种清理工作后再次抛出这个异常。
- 2.恢复中断:有时候不能抛出InterruptedException,例如代码是Runnable的一部分,这时必须捕获异常,并通过调用当前线程上的interrupt方法恢复中断状态(阻塞方法处理中断会清除中断状态),这样调用栈中更高层代码将看到引发了一个中断。(只有调用isInterrupted才可以看到)。
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// 恢复中断状态
Thread.currentThread().interrupt();
}
}
void processTask(Task task) {
// Handle the task
}
interface Task {
}
}
- 3.屏蔽中断:只有一种情况才可以屏蔽中断,只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规的任务和代码库中都不应该屏蔽中断请求。(见下面PrimeProducer类的代码)
- 4.其它方法,根据需求定。
- 下面代码的演示了不可以取消的任务,在发生中断时该如何响应中断情况,代码中有阻塞方法take,如果线程被中断就会使take抛出InterruptedException异常,但是任务不可以取消,所以发生中断后通过重试
public class NoncancelableTask {
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
interface Task {
}
}
使用中断解决上面“永远不会结束的任务”
- 虽然PrimeProducer屏蔽了中断,这是因为它知道线程将要结束(通过while (!Thread.currentThread().isInterrupted())代码),即PrimeProducer实现了线程的中断策略,所以线程被中断后,虽然put方法会抛出中断异常,但是那是合理的,因为即使不抛出异常,while方法中通过检查也会发现线程被中断,从而结束线程。
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* 允许线程退出 */
}
}
public void cancel() {
interrupt();
}
}
使用Future来取消任务
- 针对前面的案例,如果在run中的任务发生了未检查异常,则该对于线程调用者来说该异常会被忽略掉,因为其运行在单独的线程中,如果想将异常抛出以便调用者来处理,则该使用Future来处理。
- Future的cancel方法:接收boolean类型参数mayInterrptIfRuning,表示任务是否能够接收中断,如果为true,并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么“若任务还没启动,就不要运行”。
- 除非你清楚线程中断策略,否则不要中断线程,那么在什么情况下可以将cancel参数设为true?执行任务的线程由标准的Executor创建的,它实现了一种中断策略使得任务可以通过中断被取消。
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 接下来任务将被取消,放在了finally处理
} catch (ExecutionException e) {
// 如果在任务中抛出异常,那么重新抛出该异常
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}
中断策略
- 中断策略规定线程如何解释某个中断请求---当发现中断请求时,应该做哪些工作,哪些工作单元对于中断来说是原子性操作,以及以多快的速度来响应中断。