回归正题,继续看源码
(1)构造函数部分
public SmartExecutor() {
initThreadPool();
}
public SmartExecutor(int coreSize, int queueSize) {
this.coreSize = coreSize;
this.queueSize = queueSize;
initThreadPool();
}
protected synchronized void initThreadPool() {
if (threadPool == null) {
threadPool = createDefaultThreadPool();
}
}
public static ThreadPoolExecutor createDefaultThreadPool() {
// 控制最多4个keep在pool中
int corePoolSize = Math.min(4, CPU_CORE);
return new ThreadPoolExecutor(
corePoolSize,
Integer.MAX_VALUE,
DEFAULT_CACHE_SECOND,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
static final String NAME = "lite-";
AtomicInteger IDS = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, NAME+IDS.getAndIncrement());
}
},
new ThreadPoolExecutor.DiscardPolicy());
}
需要注意的地方是initThreadPool()方法加了一个对象锁,为了防止在不同线程同时调用该方法。但是如果在不同线程创建不同的SmartExecutor对象会如何呢?这个锁岂不是就不起作用了?
再次阅读了下关于java多线程的知识,得到的解释是:java 方法本身是线程安全的。但是有一个问题在于,在方法中有没有全局变量(类静态变量、对象实例变量),如果有全局变量,在多线程调用的时候要多加注意进行同步处理。
所以个人认为,这里使用synchronized是为了保护静态变量threadPool。而如果在不同的线程创建不同的对象,是有可能发生线程不安全的。当然,在一般的使用场景下,这种可能性非常小。
(2)任务封装
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
在AbstractExecutorService里看到过的代码,分别:
- 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future。
- 为指定的Callable创建一个FutureTask。
(3)提交
/*
* 提交Runnable任务
*/
public Future<?> submit(Runnable task) {
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是null
RunnableFuture<Object> ftask = newTaskFor(task, null);
// 调用具体实现的execute方法
execute(ftask);
return ftask;
}
/*
* 提交Runnable任务
*/
public <T> Future<T> submit(Runnable task, T result) {
// 通过newTaskFor方法构造RunnableFuture,默认的返回值是result
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/*
* 提交Callable任务
*/
public <T> Future<T> submit(Callable<T> task) {
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
同样来自于AbstractExecutorService
(4)execute方法
@Override
public void execute(final Runnable command) {
if (command == null) {
return;
}
WrappedRunnable scheduler = new WrappedRunnable() {
@Override
public Runnable getRealRunnable() {
return command;
}
public Runnable realRunnable;
@Override
public void run() {
try {
command.run();
} finally {
scheduleNext(this);
}
}
};
boolean callerRun = false;
synchronized (lock) {
if (runningList.size() < coreSize) {
runningList.add(scheduler);
threadPool.execute(scheduler);
} else if (waitingList.size() < queueSize) {
waitingList.addLast(scheduler);
} else {
switch (overloadPolicy) {
case DiscardNewTaskInQueue:
waitingList.pollLast();
waitingList.addLast(scheduler);
break;
case DiscardOldTaskInQueue:
waitingList.pollFirst();
waitingList.addLast(scheduler);
break;
case CallerRuns:
callerRun = true;
break;
case DiscardCurrentTask:
break;
case ThrowExecption:
throw new RuntimeException("Task rejected from lite smart executor. " + command.toString());
default:
break;
}
}
//printThreadPoolInfo();
}
if (callerRun) {
command.run();
}
}
核心代码。
- 这里WrappedRunnable继承Runnable接口,在execute里用匿名内部类的方式生成了scheduler对象。
- 下面进行判断,当运行链表未满时,加入运行链表,同时执行任务;当等待链表未满时,加入等待链表;如都不满足,则根据过载策略,来选择处理方式。这里可以发现,CallerRuns表示在调用的线程中执行任务,执行后并不会进行其他操作。
- 非CallerRuns策略执行的任务在完成后会调用scheduleNext(WrappedRunnable scheduler)方法,将scheduler从运行队列中remove。然后从等待队列中根据排序策略选择下一个执行的任务,加入运行队列,并执行。
- 所有有关全局变量的操作都加上了同步锁,保证线程安全
OK,这篇源码到这里就全部结束了。