有这样一种业务场景,从远端服务下载文件到本地,每个文件都10M+比较大,定时任务串行下载效率低下,所以考虑固定数量的线程并行去下载,这样可以控制对机器的资源消耗,同时提高下载吞吐量,对于这样的场景就需要控制线程同时执行时的数量。实现方式其实有多种,本人提供以下两种方式:
- 利用信号量(Semaphore ),通过acquire和release方法控制同时访问的线程个数。
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
// 创建给定个数的公平模式的信号量
final Semaphore sp = new Semaphore(3, true);
for (int i = 0; i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
// 获取許可
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
/* 业务逻辑处理 .... */
// 释放許可
sp.release();
}
};
pool.execute(runnable);
}
}
- 自定义线程池,重写拒绝策略,提交任务至任务队列由非阻塞的offer方法改成阻塞的put方法,任务队列满了之后,后续提交的任务就会阻塞,这样就能够有效控制执行的线程数。
/**
* 创建固定大小的阻塞式的线程池, 并且指定工作队列大小
* @param poolSize 线程个数
* @param queueSize 队列大小
* @return
*/
public static ExecutorService newBlockingThreadPool(int poolSize, int queueSize) {
if (poolSize < 2) {
poolSize = DEFAULT_POOL_SIZE;
}
if (queueSize < poolSize) {
queueSize = poolSize;
}
ExecutorService es = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueSize), new RejectedExecutionHandler() {
/**
* 自定义拒绝策略, 当工作队列满时, 生产者调用put阻塞
*/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (! executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Create blocking thread pool error !", e);
}
}
}
});
return es;
}