关于线程池介绍,我不在此赘叙,请参考https://www.jianshu.com/p/ade771d2c9c0
线程池中queue一般设置大小默认是Integer.MAX_VALUE,如果设置了大小,就必须实现一个丢弃策略,而默认的丢弃策略居然是抛异常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
当任务量超大,内存被撑满造成宕机,会导致所有的任务都丢失了。当然,可以使用MQ来解决类似的问题。在此我们只讨论使用线程池本身来解决。
那能不能人为控制队列大小,当队列达到该值,就不再往线程池队列里提交任务呢?以下采用ReentrantLock可重入锁机制来实现
/**
* Created on 2018/1/22 16:29
* <p>
* Description: [测试控制线程池队列大小]
* <p>
* Company: [xxxx]
*
* @author [aichiyu]
*/
public class TestLockPool {
private int maxSize = 100 ;
private final ReentrantLock lock = new ReentrantLock();
private List<Condition> list = new LinkedList<>();
private ThreadPoolExecutor executor =new ThreadPoolExecutor(20, 100,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public void init(){
scheduledExecutorService.scheduleAtFixedRate(()->{
int queueSize = executor.getQueue().size();
//每秒检查一次,当队列中任务被执行完就解锁一批任务,继续往队列中加
if( queueSize < maxSize * 0.8 && list.size() > 0 ){
System.out.println("unlock !!~~");
lock.lock();
int i = 0 ;
Iterator<Condition> iterator = list.iterator();
while (i < maxSize-queueSize && iterator.hasNext()){
iterator.next().signal();
iterator.remove();
i++;
}
System.out.println("signal over!!~~,num="+(i));
lock.unlock();
}
},1,1, TimeUnit.SECONDS);
}
private void consume(){
try {
//当队列大小超过限制,阻塞当前线程,等待队列空闲
if(executor.getQueue().size() >= maxSize ){
System.out.println(Thread.currentThread()+" wait !!~"+"pool queue size = "+executor.getQueue().size());
lock.lock();
Condition condition = lock.newCondition();
list.add(condition);
condition.await();
System.out.println(Thread.currentThread()+"wait over!~~");
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.submit(()->{
System.out.println(Thread.currentThread()+" execute !!~~"+"pool queue size = "+executor.getQueue().size());
try {
//模拟任务阻塞
Thread.sleep(2500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
TestLockPool testLock = new TestLockPool();
testLock.init();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 200; i++) {
service.submit(()->testLock.consume());
}
System.out.println("main over!~");
}
}