1.1 为什么要线程池
我们在执行大规模任务时,如安卓中的多图下载,网络请求,都少不了使用线程。而线程作为进程下面的计算基本单位,它必然会有一些进程的特点,如需要系统分配内存资源,所以有下面的说法:
each thread requires an allocation of stack memory whose default size ranges from 64 KB to 1 MB, depending on the OS.
可以看出,在一个任务对应一个线程的模式下,这样频繁创建线程可能面对的第一个问题是资源瓶颈,线程大量增加,出现OutOfMemoryError()的异常风险会增高。
为了避免频繁创建线程,我们可以想到一个解决方案: 一个线程对应多个任务,把线程缓存起来。
另外,线程的创建是有自己生命周期,创建好了不会马上运行,这对于一些网络请求要求速率快无疑有很大的影响。
根据上面的几个特点,我们可以根据CPU核数、JVM启动参数、Thread构造函数中请求栈道大小等因素,限制线程的数量,从而提高系统的吞吐率。
2.1 自己写一个线程池
在Java JDK中,提供了Excutors工具类定制了一些ThreadPoolExecutor,通过工厂方法可以得到它们。具体可以去看源码,这里我主要是通过手写线程池主要的对象来分析线程池管理任务的过程。
需求一 :设计可执行任务的接口
根据面向接口编程的原则,定义一个IExecutor 接口,它有一个方法execute(Runnable r)专门用来执行任务:
package executor;
/**
* Created by qiangzeng on 17/3/19.
*/
public interface IExecutor {
void execute(Runnable runnable);
}
该任务可以在当前线程中同步执行。如下面代码:
Executor mExecutorSync = new Executor() {
public void execute(Runnable command) {
command.run();
}
};
mExecutorSync.execute(new Runnable() {
public void run() {
System.out.println("我在主线程,同步执行");
}
});
也可以扔到新开的线程中异步执行。如下面代码:
Executor mExecutorAsync = new Executor() {
public void execute(Runnable command) {
new Thread(command).start();
}
};
mExecutorAsync.execute(new Runnable() {
public void run() {
System.out.println("我在新开的子线程异步执行");
}
});
至此,我们基本实现了需求一:管理者Executor执行一个任务Runnbale。在这里我们仍然是执行一个任务需创建一个线程,这还不够. 我们需要一个线程能调度很多任务。
需求二:创建线程
对于需求二,让有限的线程逐个去执行队列中任务,完成这个工作首先我们需要有个线程工厂类,创建线程:
为了统一构造线程池中的线程,代码如下。
/**
* Created by qiangzeng on 17/3/23.
*/
public abstract class MyThreadFactory {
public abstract Thread newThread(Runnable runnable);
}
我们可以定义一个默认线程工厂实现类:
public static MyThreadFactory defaultThreadFactory = new MyThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
如上,我们给线程池的线程统一设置为非后台线程,优先级为一般。需要它时可以直接使用。
线程需要一个代理类,这样我们可以给线程优雅地做运行、中断操作,我们把它叫做Worker类:
/**
* 任务持有者,对任务进行执行,(拒绝,中止.请参考java源码)
*/
private class Worker implements Runnable {
private Runnable firstTask;
private Thread thread;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = threadFactory.newThread(this);//利用线程工厂创建线程
}
public void run() {
runWork(this);
}
}
Work类,做些什么事呢? 它持有一个线程和默认的任务。如果线程池的数目还没用达到corePoolSize(线程限制数)我们就创建新Work,并添加到线程调度池中:
/**
* 线程池集合,用来保存线程工作者.
*/
private HashSet<Worker> workers = new HashSet<Worker>();
至于*runWork(this) *方法我们稍后会分析,到这里,我们的线程基本工作准备好了。下面我们需要分析下任务是怎么被线程调度。
需求三:利用线程池调度任务
首先我们需要一些原材料(属性)我们创建一个IExecutorImpl类,它是IExecutor的实现。
/**
* 线程池的大小
*/
private int corePoolSize;
/**
* 保存任务的队列
*/
private BlockingDeque<Runnable> workQueue;
/**
* 用来创建线程的工程类
*/
private MyThreadFactory threadFactory;
上面代码声明了基本属性:
- 1.线程池的线程数
- 2.创建线程的工厂类
- 3.阻塞任务队列
因为这些属性是线程池必备的。我们在构造函数中初始化这些属性
public IExecutorImpl(int corePoolSize, BlockingDeque<Runnable> workQueue, MyThreadFactory threadFactory) {
this.corePoolSize = corePoolSize;
this.workQueue = workQueue;
this.threadFactory = threadFactory;
}
从属性可以看出,我们要提交到线程池的任务可能会因为等待执行而放在阻塞任务队列中。至此我们可以开始正式写线程调度逻辑了:
1.当用户提交任务时,会执行execute().
2.如果线程池中的线程工作者小于线程限制大小则继续新增线程,并马上运行提交的任务.
3.否则,把任务放入阻塞队列等待执行.
/**
* 覆写IExecutor中的execute()
* @param command
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (getWorkCount() < corePoolSize)//
addWork(command);
else {
workQueue.offer(command);//否则,把任务放入阻塞队列等待执行
}
}
上面addWork()做的工作是:向线程池中添加线程,代码如下:
public void addWork(Runnable firstTask) {
if (getWorkCount() < corePoolSize) {
Worker worker = null;
try {
final ReentrantLock mainLock = this.mainLock;
worker = new Worker(firstTask);
final Thread t = worker.thread;
if (t != null) {
mainLock.lock();
workers.add(worker);
t.start();
}
} finally {
mainLock.unlock();
}
}
}
上面代码逻辑:
1.判断有没有超过线程池最大数量
2.在临界区中新建一个线程工作者
3.把线程工作者Work添加到线程池works里面,并启动线程,然后执行最新提交的任务。
这样,如果线程池达到了最大限制数,那么我们不再创建新线程(除非某个线程停止了)。来了新的任务,我们就放入任务队列。
那么我们思考一下,一个线程如何执行多个任务?
很简单,我们看看之前的runWork(this)方法:
public void runWork(Worker worker) {
Runnable task = worker.firstTask;
worker.firstTask = null;
while (task != null || (task = getTask()) != null) {
task.run();
task = null;
}
work.firstTask是创建新线程是添加的默认任务。 而这里主要逻辑是在一个while循环中不断的getTask()得到阻塞队列中的任务,执行run(),然后销毁为null.
我们看看getTask():
public Runnable getTask() {
Runnable runnable = null;
try {
runnable = workQueue.take();
return runnable;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
return runnable;
}
}
很简单,就是从workQueue中take拿一个任务出来。因为是阻塞队列。如果队列为空则线程等待。知道被添加新的任务才会被唤醒。如此反复的调度任务。
3.1实例
为了更方便的使用线程池,我们会默认配置一些参数,所以有一个线程池工厂类MyExecutors用来创建线程池:
public class MyExecutors {
public static IExecutor newFixedThreadPool(int nThreads) {
return new IExecutorImpl(nThreads, new LinkedBlockingDeque<Runnable>(), MyExecutors.defaultThreadFactory);
}
我们可以正式使用这个线程池了:
功能:
一个线程, 模拟调度多个任务。
/**
* Created by qiangzeng on 17/3/23.
*/
public class App {
public static void main(String[] args) {
IExecutor iExecutor = MyExecutors.newFixedThreadPool(1);//默认只有一个线程, 模拟调度多个任务
iExecutor.execute(new Runnable() {
public void run() {
System.out.println("task1");
try {
System.out.println("等线程1执行firstTask,5秒...后");
System.out.println("线程1从任务队列里面依次取出任务,有序执行...");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
for ( int i = 0; i < 10; i++) {
final int count = i;
iExecutor.execute(new Runnable() {
public void run() {
System.out.println("task"+ (count +2));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
输出结果:
task1
等线程1执行firstTask,5秒...后
线程1从任务队列里面依次取出任务,有序执行...
task2
task3
task4
task5
task6
task7
task8
task9
task10
task11
总结
上面是本人对JAVA线程池简要分析,主要是用作对源码的知识笔记。线程池中的任务调度,除了从队列中拿任务执行,还有一个非常重要的是如何拒绝任务。因为我们这里只是通过限制线程数来防止资源消耗过多。但是任务没有任何限制,而且任务什么时候可以中断取消执行,则要在Work类中去处理了,源码是最好的教材,有空继续阅读。