线程池中的一个线程异常了会被怎么处理?
抛异常出来并打印在控制台上(只对了一半,根据提交方式的不同(execute和 submit))
其他线程任务不受影响
异常线程会被回收
下面进行验证:
1.抛异常出来并打印在控制台上?
熟悉Executors线程池(本文线程池都是指Executors)都知道 有两种提交线程的方式execute和submit方式,下面将以这两种提交方式来验证。
public class ExecutorsTest {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5
, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(4), SmallTool.getCustomThreadFactory()
, (r, executor) -> SmallTool.printTimeAndThread(" 正在放弃任务:" + r + " , 所属executor : " + executor));
pool.execute(() -> sayHi("execute"));
Thread.sleep(1000);
pool.submit(() -> sayHi("submit"));
}
public static void sayHi(String name) {
String printStr = "thread-name:" + Thread.currentThread().getName() + ",执行方式:" + name;
System.out.println(printStr);
throw new RuntimeException(printStr + " error!!!");
}
}
结果:
thread-name:pool-llc-thread-1,执行方式:execute
Exception in thread "pool-llc-thread-1" java.lang.RuntimeException: thread-name:pool-llc-thread-1,执行方式:execute error!!!
at com.orion.base.ExecutorsTest.sayHi(ExecutorsTest.java:23)
at com.orion.base.ExecutorsTest.lambda$main$1(ExecutorsTest.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
thread-name:pool-llc-thread-3,执行方式:submit
从运行结果可见:
execute执行方式抛出异常显示在控制台了。
submit执行方式并没有显示。
众所周知submit底层其实也是调用的execute,因此它也有异常只是处理方法不一样,它们的区别是:
1、execute没有返回值。可以执行任务,但无法判断任务是否成功完成。——实现Runnable接口
2、submit返回一个future。可以用这个future来判断任务是否成功完成。——实现Callable接口
submit的话,我们可以在其返回的future中拿到结果。稍微修改下代码,将其异常打印出来即可。
public class ExecutorsTest {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 5
, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(4), SmallTool.getCustomThreadFactory()
, (r, executor) -> SmallTool.printTimeAndThread(" 正在放弃任务:" + r + " , 所属executor : " + executor));
pool.execute(() -> sayHi("execute"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Future future = pool.submit(() -> sayHi("submit"));
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("execution Exception : " + e.getMessage());
}
}
public static void sayHi(String name) {
String printStr = "thread-name:" + Thread.currentThread().getName() + ",执行方式:" + name;
System.out.println(printStr);
throw new RuntimeException(printStr + " error!!!");
}
}
运行结果:
thread-name:pool-llc-thread-1,执行方式:execute
Exception in thread "pool-llc-thread-1" java.lang.RuntimeException: thread-name:pool-llc-thread-1,执行方式:execute error!!!
at com.orion.base.ExecutorsTest.sayHi(ExecutorsTest.java:33)
at com.orion.base.ExecutorsTest.lambda$main$1(ExecutorsTest.java:13)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
thread-name:pool-llc-thread-3,执行方式:submit
execution Exception : java.lang.RuntimeException: thread-name:pool-llc-thread-3,执行方式:submit error!!!
造成这种区别,只能去查看源码到底是怎么回事。
execute
根据代码直接点进来,找到在java.util.concurrent.ThreadPoolExecutor#runWorker中抛出了运行异常:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以见到,它抛出了异常,最终还是会去到java.lang.ThreadGroup#uncaughtException进行了异常处理:
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
因为没有自定义UncaughtExceptionHandler ,所以使用默认的,
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
可见上面打印的方式和这里是一致的。
异常处理有多种方式,详情可见Java线程池「异常处理」正确姿势:有病就得治。
submit
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
可见submit底层也是调用execute,但是它会先包装成一个futureTask,它有自己的run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
可以看到,catch那里有个setException,进去看看,debug一下:
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
get()方法可见会将outcome的包装成一个ExecutionException再扔出来,就验证来上面打印的是execution Exception。
2. 其他线程任务不受影响?
public class ExecutorsTest {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1
, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(16), SmallTool.getCustomThreadFactory());
for (int i = 0; i < 10; i++) {
String s = (i == 3) ? "executeException" : "execute";
pool.execute(() -> sayHi(s));
}
}
public static void sayHi(String name) {
if ("executeException".equals(name)) {
throw new RuntimeException(Thread.currentThread().getName() + " -- execute exception");
} else {
System.out.println(Thread.currentThread().getName() + " -- execute normal");
}
}
}
运行结果:
pool-llc-thread-1 -- execute normal
pool-llc-thread-1 -- execute normal
pool-llc-thread-1 -- execute normal
pool-llc-thread-2 -- execute normal
pool-llc-thread-2 -- execute normal
pool-llc-thread-2 -- execute normal
pool-llc-thread-2 -- execute normal
pool-llc-thread-2 -- execute normal
pool-llc-thread-2 -- execute normal
Exception in thread "pool-llc-thread-1" java.lang.RuntimeException: pool-llc-thread-1 -- execute exception
at com.orion.base.ExecutorsTest.sayHi(ExecutorsTest.java:23)
at com.orion.base.ExecutorsTest.lambda$main$0(ExecutorsTest.java:17)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
由上可见,当有一个线程发生异常时,其他线程是不受影响的。
异常线程会被回收?
但是线程标号已经超过maxPoolSize,默认threadFactory的标号中使用atomicInteger来递增的。为什么会出现该情况,其实总归还是在 ThreadPoolExecutor#runWorker()方法中的processWorkerExit(w, completedAbruptly) 中。
查看代码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
可见它是先从workers中删除掉,再addWorker,addWorker就会创建线程,线程标号就会增加。