ForkJoin在JDK1.7诞生,为了在大数据量下并行执行任务,提高效率。
把大任务拆分成小任务
Fork Join
ForkJoin特点:工作窃取
这里面维护的都是双端队列
B线程执行完四个任务不会等待,他会窃取A的剩余的任务执行,提高效率
Fork Join 工作窃取
ForkJoin的使用
1 .ForkJoinPool 执行
ForkJoinPool
2 .计算任务 ForkJoinPool.execute(ForkJoinTask<?> task)
execute(ForkJoinTask<?> task)
3.看看这个ForkJoinTask!
ForkJoinTask
4.我们要用到
RecursiveTask
看一下文档的例子需要继承RecursiveTask
/*
public abstract class RecursiveTask<V>
extends ForkJoinTask<V>
递归结果ForkJoinTask 。
对于一个典型的例子,这里是一个任务计算斐波纳契数字:
*/
class Fibonacci extends RecursiveTask<Integer>
{
final int n;
Fibonacci(int n)
{
this.n = n;
}
protected Integer compute()
{
if (n <= 1) return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
/*
然而,除了计算斐波纳契函数的一种愚蠢的方法(有一个简单的快速线性算法,您将在实践中使用),这很可能表现不佳,因为最小的子任务太小而不能被分解。 相反,正如几乎所有fork / join应用程序的情况一样, 您可以选择一些最小粒度大小(例如,在此为10),您始终依次解决而不是细分。
*/
写一个计算任务 extends RecursiveTask 重写compute() 方法
public class ForkJoinDemo extends RecursiveTask<Long>
{
private Long start;
private Long critical = 10000L; //默认临界值 10000L
private Long end;
public ForkJoinDemo(Long start , Long end)
{
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute()
{
if ((end-start)>critical)
{
//当 ecd-start>critical 大于临界值 时我们要分支合并计算
long middle = (start + end) / 2; //中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();//拆分任务把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();//拆分任务把任务压入线程队列
//获取结果
long result= task1.join() + task2.join();
return result;
}else {
//小于临界值 用普通操作
long sum = 0L;
for (long i = start; I <= end; i++)
{
sum+=i;
}
return sum;
}
}
}
我们用 for循环 ForkJoin Stream流 分别测试一下0加到10亿看看效率
public class Test
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//fori();
//forkJoin();
//stream();
}
// fori 操作
public static void fori()
{
long sum = 0L;
long start = System.currentTimeMillis();
for (long i = 1; i <=10_0000_0000 ; i++)
{
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+ "用时:"+(end-start) +"毫秒");
}
//ForkJoin 效率可以更高 调节临界值
public static void forkJoin() throws ExecutionException, InterruptedException
{
long start = System.currentTimeMillis();
//创建 ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
//执行任务,没返回值
// forkJoinPool.execute(task);
//提交任务,有返回值 返回task
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+"时间:"+(end-start)+"毫秒");
}
//Stream并行流
public static void stream()
{
long start = System.currentTimeMillis();
// 注意这个range方法 计算时不会加第一位和最后一位数 (start+end)
//LongStream.range(start,end);
//要用这个 (start+end]
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);//并行求和
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+ "时间:"+(end-start)+"毫秒");
}
}
运行结果:
fori: sum = 500000000500000000用时:429毫秒
forkJoin: sum = 500000000500000000时间:311毫秒
stream: sum = 500000000500000000时间:197毫秒