1 实现通过线程池执行任务
public class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread Id: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
// corePoolSize, maxPoolSize, keepAliveTime, unit
// workQueue, threadFactory, handler
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 20, 0L
, TimeUnit.MILLISECONDS
, new LinkedBlockingDeque<>(1024)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
pool.execute(task);
// pool.submit(task);
}
pool.shutdown();
}
}
创建了一个ThreadPoolExecutor,参数中使用了默认的线程创建工厂DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
线程池中默认线程名称前缀为"pool-" + poolNumber.getAndIncrement() + "-thread-";
ThreadPoolExecutor构造方法中,将参数赋值到对象成员中
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2 execute方法提交任务进行处理
我们知道,当工作线程数小于核心线程数,则会创建新线程执行任务,当工作线程数大于corePoolSize,则会将任务丢到workQueue中,当workQueue中存满任务时,则会执行拒绝策略。execute方法中是任务执行的流程:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 工作线程数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 线程池添加任务成功,则返回
if (addWorker(command, true))
return;
// 线程池添加任务失败,再次获取线程数量
c = ctl.get();
}
// 工作线程数大于核心线程数
// 线程池处于RUNNING状态,并且往任务队列添加任务成功
if (isRunning(c) && workQueue.offer(command)) {
// 再次对线程池状态检查
int recheck = ctl.get();
// 线程池状态不是RUNNING ,并且从任务队列删除任务成功
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// 工作线程数量为0,则线程池处于TIDYING状态
else if (workerCountOf(recheck) == 0)
// 添加一个null任务
addWorker(null, false);
}
// 线程池不在RUNNING状态
// workQueue.offer为false,并且往任务队列添加任务失败,表示任务队列已满
// (addWorker失败,表示线程池数量达到maxPoolSize)
// false表示工作线程数不和corePoolSize比较,而和maxPoolSize比较
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
上述代码是ctl的定义:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
可以看到 RUNNING 表示 workQueue 中限制的线程数目,值为(2^29)-1大概为500 million
而线程池有四种状态:
RUNNING:接收新任务,处理任务队列中的任务
SHUTDOWN:不接受新任务,但是会消费任务队列中的任务
STOP:不接受新任务,不消费任务队列中的任务,并且会interrupt运行中的任务
TIDYING:所有任务终结,workCount数量为0,并且会去调用terminated()方法
TERMINATED:terminated()方法执行完成
四种状态的变化在注释中也已经给出
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
3 addWorker往线程池添加任务
addWorker方法往线程池中添加任务,其实现如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 计算当前工作线程数
int wc = workerCountOf(c);
// 通过core参数判断工作线程数是否大于corePoolSize或者maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS添加工作线程数
if (compareAndIncrementWorkerCount(c))
// 成功则跳出循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 通过firstTask封装一个工作对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 往workers集合中添加work对象
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 添加成功标记
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
// 标记任务运行状态
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
其中new Worker封装了firstTask任务,构造方法如下
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// firstTask为提交进来的command,即main方法中提交给线程池的任务
this.firstTask = firstTask;
// 通过线程工厂创建线程池,传入this,即worker
this.thread = getThreadFactory().newThread(this);
}
newThread方法如下:
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
设置为非守护线程,newThread(this)传入的对象为this,为Worker对象,所以,在addWorker方法中的t.start()方法启动时,执行的是Worker的run方法。
addWorker方法主要将任务command对象封装成一个worker,添加到线程池中的HashSet<Worker>中,并且启动该任务,成功则返回true,失败返回false。
4 runWorker方法执行work
Worker的run方法中,执行如下代码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Worker中封装的firstTask为demo main方法中提交的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 执行firstTask或者从阻塞队列中获取任务,执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行task
task.run();
// 任务出现异常,则抛出异常并停止运行线程池
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Worker对象里创建出来thread数量可以达到corePoolSize个,并且都不会被关闭。这些thread执行的任务为从任务队列中获取添加进来的command,并且执行(即runWorker方法做的事情)。
5 demo总结
再回到execute方法中进行debug,我们可以发现,在当前demo中
前四个MyTask任务提交之后是被addWorker处理的,也就是创建了新的线程。
而后面的5个新任务则是被塞进了workQueue中进行缓存。
当执行到main方法中的 pool.shutdown()这里时,看一下线程调用堆栈
其中运行着5个创建的线程,满足corePoolSize的数量,执行完pool.shutdown(),线程池中的线程会自动从任务队列中取出任务,进行执行,满足前文我们的分析。