Java 线程池中worker
在java线程中,真正执行计算操作的内容是在一个worker类中。
Worker的主要代码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
......
总体来看,worker其实就是一个Runable,其也是需要构造成一个Thread对象,然后调用Thread start方法运行的。只不过在worker的run方法中是定一个了一个runWoker
的方法。这个方法的主要内容从 for 循环的不停的从task队列中获取对应的runable的task,然后同步调用这个task的run()方法。其实就是在某个线程中,不停的拿队列中的任务进行执行。
运行worker
可以看到构造方法内,有一个Thread对象,其使用了ThreadFactory构造了一个新的线程,并且线程的runable是worker本身。
this.thread = getThreadFactory().newThread(this);
所以需要执行worker的时候,只需要调用worker中的thread的start方法即可,并且可以调用thread的方法来控制worker的状态:
interrupt等。
worker和ThreadPool
在ThreadPool中是有一个workder集合的。通过这个集合,我们可以知道有多少worker线程在进行工作等,每一个worker都是各自进行工作,工作的内容就是不停的获取task,然后执行task即可。
worker中的异常处理
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
通过源代码可以看出,对应的Exception都是保存在thrownn中,在finally中交给了 afterExecute
进行了处理。
所以可以自己实现对应的afterExecute来进行处理系统内部发生的异常问题。