ScheduledThreadPoolExecutor与ThreadPoolExecutor的使用还是有一些区别的。比如异常捕获和删除任务操作这两点上就有很大不同。这篇文章试图要说清楚的就是这两点。
1. 提前捕获并处理Runnable代码块里的异常
先看ThreadPoolExecutor执行任务的源码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //任务执行位置
} catch (RuntimeException x) {
thrown = x; throw x; //捕获并抛出异常,下同
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看到任务执行的部分(task.run())均捕获了异常,同时向外抛出(throw x),所以一旦任务出错,外层没有捕获异常的话,程序会抛出异常。
再看看ScheduledThreadPoolExecutor里执行任务的源码
我们新建的Runnabe任务在执行的时候被封装成内部类ScheduledFutureTask,执行位置在ScheduledFutureTask里
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
super.run(); //非周期任务,调用父类的run()
else if (super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
ScheduledFutureTask的父类为FutureTask,run( ) 和 runAndReset()在异常处理方面是一样的,只看run()的源码
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //任务执行位置
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); //传递异常
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们传给ScheduledThreadPool的Runnable封装之后会被Callable调用,任务在 result = c.call() 被执行,异常被捕获。
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
异常被捕获并转为outcome,并没有抛出。外界调用者并不知道有异常发生。
如果熟悉FutureTask的话,可以知道我们可以在get()方法里捕获该异常,但get()会造成阻塞,无法用在定时任务上。
所以,正确使用ScheduledThreadPoolExecutor的第一点便是,在任务代码块提交到线程池之前做好异常捕获。
2. 使用RunnableScheduledFuture对象做remove( )操作
前面有提到,传给ScheduledThreadPool执行的任务会进行封装,源码如下
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
然后加入到任务队列中去
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
所以此时队列中的任务对象应该是RunnableScheduledFuture的实例,所以做remove操作的应该是此对象。
RunnableScheduledFuture<Void> scheduledFuture = (RunnableScheduledFuture<Void>) scheduledThreadPool.scheduleWithFixedDelay(runnable, 0, 2, TimeUnit.SECONDS);
scheduledThreadPool.remove(scheduledFuture);