1、线程池:提供了一个线队列,队列中保存着所有等待状态的线程。避免了频繁的创建与销毁所造成的额外开销,提高了响应的速度。
2、线程池的体系结构:
java.util.concurrent.Executor:负责线程的使用与调度的根接口【Executor->执行器执行任务的一个接口】
|---(**)ExecutorService 子接口: 线程池的主要接口
|--- ThreadPoolExecutor 线程池的实现类
|--- ScheduledExecutorService 子接口:负责线程的调度
|--- ScheduledThreadPoolExecutor : 继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口,
所以该实现类即具备了线程池的功能,又有线程调度的功能。
3、工具类:Executors
ExecutorService newFixedThreadPool(): 创建固定大小的线程池;
ExecutorService newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量;
ExecutorService newSingleThreadExecutor():创建单个线程池。线程池中只有一个线程;
ScheduledExecutorService newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务
newWorkStealingPool:底层是用ForkJoinPool实现的【工作窃取】
ForkJoinPool
4、工作窃取算法
(1)、工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
(2)、那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,
并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,
而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,
而窃取任务的线程永远从双端队列的尾部拿任务执行。
(3)、工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,
其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
5、案例:
★ newWorkStealingPool
public class WorkStealingPoolDemo {
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool();
System.out.println(Runtime.getRuntime().availableProcessors()); //输出电脑的CPU核数,创建核数多个线程
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000)); //daemon
service.execute(new R(2000));
//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出,所以要添加System.in.read();
System.in.read();
}
static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
★ ForkJoinPool
(1)、Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
(2)、ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
(3)、使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)
或execute(ForkJoinTask<T> task)方法来执行指定任务了。
(4)、其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
public class ForkJoinPoolDemo {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();
static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}
System.out.println(Arrays.stream(nums).sum()); //stream api
}
/*
static class AddTask extends RecursiveAction { //继承RecursiveAction,没有返回值
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected void compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}
}
}
*/
static class AddTask extends RecursiveTask<Long> { //继承RecursiveTask,有返回值,RecursiveTask带有泛型,泛型即为返回值的类型
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);
long result = task.join(); //join有阻塞,所以不需要写System.in.read();
System.out.println(result);
//System.in.read();
}
}