Fork/Join框架
fork/join是java7提供的并行执行框架,其主要作用是把大的任务分隔成小任务,最终汇总每个小任务的结果。
1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。
3.RecursiveAction:用于没有返回结果的任务,RecursiveTask:用于有返回结果的任务,实现的分隔任务需要继承这两个类
例子(并行计算合并结果)
public class ForkJoinDemo {
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool();
BuService buService=new BuService();
BusTask task=new BusTask(1,4,2,buService);
Future<List<String>> result = pool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class BusTask extends RecursiveTask<List<String>> {
private int avg;
private int start;
private int end;
private BuService buService;
public BusTask(int start,int end,int avg,BuService buService){
this.start = start;
this.end = end;
this.avg=avg;
this.buService=buService;
}
@Override
protected List<String> compute() {
List<String> list=new ArrayList<>();
//如果任务足够小就计算
System.out.println("start:"+start+" end:"+end);
boolean canCompute = (end - start) <=avg;
if(canCompute){//这个地方可以做计算,也可以做其他的业务
for(int i=start;i<=end;i++){
list.add(buService.getBusData(i));
}
}else{
//任务拆分
int middle = (start + end) / avg;
BusTask left = new BusTask(start,middle,avg,buService);
BusTask right = new BusTask(middle+1,end,avg,buService);
//执行子任务
left.fork();
right.fork();
//获取子任务结果
List<String> leftBuServices=left.join();
List<String> rightBuServices=right.join();
list.addAll(leftBuServices);
list.addAll(rightBuServices);
}
return list;
}
}
class BuService{
public String getBusData(int i){
String r= Math.random()+"";
System.out.println("i:"+i+" random:"+r);
return r;
}
}