bug描述
今天有个bug需要我处理,业务逻辑是这样的:
搜索框中的文本,对应一次请求。但是之前的代码逻辑是这样的,在子线程需要执行时进行判断是不是网络请求的结果可以从缓存中获取,如果不行则进行开启线程请求网络。用线程池管理,Executors.newSingleThreadExecutor(),乍眼看没什么问题,但是问题却很大,因为访问网络到结果放入缓存中这个过程是耗时的,但是其他部分的刷新可能导致请求了很多次,也就是执行了很多次submit。所以就相当于缓存的判断是无效的,这锅怎么解?下面我说下我的几次解锅挣扎。
bug解决
1. 将缓存判断放入子线程的run中。
这样解锅的理由是:对于单个线程来说,缓存的判断和网络请求是同步的。看上去好像完美解决问题,但是问题依然存在,因为这次是Executors.newSingleThreadExecutor()导致的,下面分析代码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
参数2,意味着如果短期执行两次submit就会缓存一个线程在请求队列中.所以导致的结果是每次极大程度上发起两次请求。
那么问题来了,可不可以将maximumPoolSize设置成0呢?那我们继续看代码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
当maximumPoolSize<=0时候抛出参数不合法异常,所以行不通,这个时候我在DiskLruCache中找到了答案,看代码
public final class DiskLruCache implements Closeable {
/** This cache uses a single background thread to evict entries. */
final ThreadPoolExecutor executorService =
new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
...
}
把corePoolSize设置成0。我试了试,确实可以达到目的,为什么?我们过一遍流程
ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当工作的线程个数小于我们设置的corePoolSize时候添加任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行这里
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
//外部循环
retry:
for (;;) {
int c = ctl.get();//获取当前工作线程数量,数量为{c-(-536870912)}
int rs = runStateOf(c);//若c>=0时,该值才为0,否则该值一直为-536870912
/*
*由上面的一些线程池状态常量值可知,running<shutdown<stop<tidying<terminated
*若rs>=shutdown,则表明线程池处于stop、tidying、terminated三种状态的一种
*若rs>=shutdown成立,则进行后面判断,
*1、线程池处于shutdown状态
* 1.1、firstTask不为null,则返回false,也即是线程池已经处于shutdown状态,还要添加新的线程,被直接驳回(拒绝)
* 1.2、firstTask为null
* 1.2.1、此时意味着线程池状态为shutdown状态,且first为null,若阻塞队列为空,则返回false
*2、线程处于大于shutdown的状态,则直接返回false
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/*
*进入内循环以下两种情况会跳出该内循环,否则一直会循环
*1、当工作线程数量超过一定阈值,会直接返回false
*2、添加工作线程成功,即ctl的值进行了加一
*/
for (;;) {
int wc = workerCountOf(c);//获取工作线程的数量
//当线程数量>=536870911或者>=corePoolSize或maximumPoolSize的时候,则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//使用unsafe的cas操作对ctl.get()的值进行加一
break retry;//跳出这个外循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//当此时的线程池状态和之前的状态不等时
continue retry;//继续内循环
}
}
//若进行到了此步操作,则表明工作线程数量加了1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;//该w.thread为worker内部新创建的thread
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//开启锁
try {
//获取锁后,再次获取线程池的状态
int rs = runStateOf(ctl.get());
/*
*1、当线程池的状态处于shutdown以上状态,则直接释放锁,不启动线程,且执行addWorkerFailed方法
执行该方法的作用是使工作线程数量-1
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 创建的线程处于活跃状态,即被启动了,抛出异常
throw new IllegalThreadStateException();
workers.add(w);//workers是一个set集合
int s = workers.size();
if (s > largestPoolSize)//largestPoolSize默认为0,作用是记录set集合中的线程数量
largestPoolSize = s;
workerAdded = true;//改变该值,为了启动线程,且返回一个addWorker执行成功的状态
}
} finally {
mainLock.unlock();//释放锁
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
因为在addWorker代码中有这么一句wc >= (core ? corePoolSize : maximumPoolSize))
成立则返回false,表明core为false时会以maximumPoolSize来当做corePoolSize比较,按照我们写的那种是返回false,则执行reject(command);
final void reject(Runnable command) {
//handler是我们最初传递进来的
//private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
handler.rejectedExecution(command, this);
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
首先第一个请求过来了,执行线程池,发现没有线程可用,所以创建了一个线程,这个线程执行网络请求,但是这个时候紧跟着第二次submit过来了,发现这个线程池只能容得下一个线程,并且这个线程正在执行,所以不能创建新的线程进行网络请求。此时就达到了请求一次的目的。我们提交代码的时候需要审批,大牛看到了这种写法很奇怪说是Daemon的,但是我想了想守护线程和这个没关系啊,所以问他,他说DiskLruCache需要不死线程,我这个线程池会凋亡。这个观点我暂时还不理解,但是大牛又是权威,我不敢问希望那天可以理解。有的童鞋说用AsyncTask,我之前看过AsyncTask源码但是很多细节没有注意,这个时候有点懵逼他们说的同步和并行在AsyncTask中的使用。所以我继续看AsyncTask源码,下面就是解释AsyncTask源码的过程:
2.利用AsyncTask解决
AsyncTask.java
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
AsyncTask.SerialExecutor.java
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
SerialExecutor是个内部类,他将Runnable放入一个队列中,然后对mActive变量进行判断,如果这个变量为空则执行队列中的请求,首先获取队列中的一个请求赋值给mActive此时mActive不为空,并且此段代码上锁不担心多线程问题,继而执行threadPoolExecutor对应的线程池执行execute,此线程池的定义代码如上图。
结论
这样也可以解决上面说的bug,原因是串行,虽然submit了多次也给mTasks中添加了多次runable,但是第一个请求执行完成之后才能执行第二个,此时第二个请求run内部已经做了缓存的判断,所以能达到效果。
那么我们分析一下使用哪一种方法更好呢?我觉得使用第一种,因为创建线程的个数为一,不浪费资源。但是如果使用AsyncTask针对这个小bug的话可能会消耗更多资源。
但是第一种最好捕获异常或者更换策略别问我为什么,这种事情小声点哈哈,我觉得换成DiscardPolicy最好,啥也不执行