ForkJoin

TODO
持续补充中...

前言

以下内容都是基于JDK1.8介绍

什么是ForkJoin(FJ)

ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。

核心
  • 分治算法(Divide-and-Conquer):
      把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。

image.png

图片来源:https://www.jianshu.com/p/32a15ef2f1bf

  • work-stealing(工作窃取)算法:
      线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。

以下翻译自 A Java Fork/Join Framework (Doug Lea)

fork/join框架的核心在于它的轻量级调度机制。FJTask采用了Cilk的work - stealing调度器中首创的基本策略 :

  • 每个工作线程在自己的线程中维护可运行的任务调度队列。
  • 队列被维护为双端队列(deques),支持LIFO,push、pop操作,以及FIFO的poll/take
  • 工作线程(FJThread)运行的任务中生成子任务被push自己的deques上。
  • 工作线程默认情况下以LIFO的模式处理自己的deque,通过pop获取任务。
  • 当工作线程没有本地任务要运行时,它会尝试,从随机选择的其他队列那里,使用FIFO(最早的优先)规则偷取一个任务,。
  • 当一个工作线程遇到一个join操作时,它处理其他任务(如果可用),直到目标任务完成。否则,所有任务都会在没有阻塞的情况下运行完成。 TODO
  • 当一个工作线程没有任务并且偷取不到任何任务时,对于其他类型,它会back off(通过yield、sleep和/或优先级调节),并稍后再次尝试,除非所有
    的worker都空闲,在这种情况下,所有的worker都block,直到从顶层调用另一个任务。
image.png

图片来源:A Java Fork/Join Framework (Doug Lea)

核心类介绍

  • ForkJoinPool 这是线程池的核心类,也是提供方法供我们使用的入口类。基本上forkJoin的核心操作及线程池的管理方法都由这个类提供。
image.png
  • ForkJoinPool.WorkQueue 这是ForkJoinPool类的内部类。也是线程池核心的组成部分。ForkJoinPool线程池将由WorkQueue数组组成。为了进一步提高性能,与ThreadPoolExecutor不一样的是,这没有采用外部传入的任务队列,而是作者自己实现了一个阻塞队列。奇数位是带工作线程的存放fork出的子任务的队列,偶数队列存放的是外部提交的任务。

  • ForkJoinWorkerThread 线程池中运行的thread也是作者重新定义的。这个类的目的是在于将外部的各种形式的task都转换为统一的ForkJoinTask格式。

  • ForkJoinTask 这是ForkJoinPool支持运行的task抽象类,我们一般使用其子类如RecursiveTask(有返回值)或者RecursiveAction(无返回值)。

https://blog.51cto.com/u_14014612/6031659

Fork-join任务运行机制

image.png

workequeues为什么区分奇偶slot

外部提交任务放在pool.workequeues偶数slot
内部提交任务放在pool.workequeues奇数slot
在我看来是一般外部提交的任务初始化,即递归起始点,会是一个大任务(相对于fork之后的小任务),FJWorkerThread会把fork之后的小任务放在自己队列的top(即工作线程自己的队列都是相对于外部提交任务而言的小任务),在FJWorkerThread启动之后是通过scan即窃取其他队列的任务,通过poll即使用FIFO的方式窃取base位,即“大任务”开始,相当于“广度优先”,而FJWorker是通过LIFO方式pop本地队列,类似与一种“深度优先”的方式,两者结合有助于发挥多线程优势。

work-steal的优势

通过

 List<Future<?>> list = new ArrayList<>();
        long start = System.currentTimeMillis();
        for (int j = 1; j <= 2; j++) {
            int i = j;
            Future<?> submit = threadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + ", level1 task " + i);
                Future innerTask = threadPool.submit(() ->
                        System.out.println(Thread.currentThread().getName() + ", level2 task" + i));
                try {
                    innerTask.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            list.add(submit);
        }
        Future[] outerTasks = list.toArray(new Future[0]);
        System.out.println("waiting...");
        try {
            for (Future outerTask : outerTasks) {
                outerTask.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("done");
        System.out.println("cost:" + (System.currentTimeMillis() - start));

分别使用

//会死锁
 testThreadPool(new ThreadPoolExecutor(2,2,10,TimeUnit.DAYS,new ArrayBlockingQueue<>(20)));
testThreadPool(Executors.newFixedThreadPool(2));
//下面两个一样,都是使用ForkJoin进行工作窃取
testThreadPool(new ForkJoinPool(2));
testThreadPool(Executors.newWorkStealingPool(2));
  • 首先提交的两个任务把线程池中的两个线程都占满了,而它们又分别提交了子任务,并等待子任务完成才退出
    子任务在工作队列中等待线程池中释放出空闲线程来执行,这是不可能的,所以两边互相等待,死锁了
  • ForkJoinPool 与普通线程池的主要区别是它实现了工作窃取算法。明显的内部区别是:
    • 普通线程池所有线程共享一个工作队列,有空闲线程时工作队列中的任务才能得到执行
    • ForkJoinPool 中的每个线程有自己独立的工作队列,每个工作线程运行中产生新的任务,放在队尾,某个工作线程会尝试窃取别个工作线程队列中的任务,从队列头部窃取,遇到 join() 时,如前面的 future.get(),如果 join 的任务尚未完成,则可先处理其他任务,这就是 ForkJoinPool 不会像普通线程池那样被死锁的秘诀。
      https://blog.csdn.net/weixin_42593549/article/details/114613629

FJ执行图示

public class AddNumTask extends RecursiveTask<Integer> {

    private List<Integer> ints;

    public AddNumTask(List<Integer> ints) {
        this.ints = ints;
    }


    @Override
    protected Integer compute() {
        if (ints.size() <= 2) {
            int sum = 0;
            for (int in : ints) {
                sum += in;
            }
            return sum;
        } else {
            AddNumTask task1 = new AddNumTask(ints.subList(0, ints.size() / 2));
            AddNumTask task2 = new AddNumTask(ints.subList(ints.size() / 2, ints.size()));
            invokeAll(task1,task2);
            Integer join = task1.join();
            join += task2.join();
            return join;
        }
    }
}

public class ForkJoinMain {
    public static void main(String[] args) {

        List<Integer> ints = new ArrayList<>();
        for (int i = 0; i <= 10; i++) {
            ints.add(i);
        }
        ForkJoinPool pool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = pool.submit(new AddNumTask(ints));
        try {
            Integer integer = submit.get();
            System.out.println(integer);
        } catch (Exception e) {
            e.printStackTrace();
        }
}
  • compute方法执行对半切为两个小task,t1(0-5),t2(6-10)后,使用invokeAll(t1,t2)的执行流程。

下去简要描述了在并发数2的情况下的FJ的运行情况

image.png
  • 首先在 pool.submit(new AddNumTask(ints))之后,把任务0-10放在了外部提交队列,create了worker1
  • worker1获取到任务0-10,执行compute,切分为t1(0-5),t2(6-10)后调用invokeAll

 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork();
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
            t1.reportException(s1);
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
            t2.reportException(s2);
    }
  • 首先t2.fork,把t2放到当前worker(FJWorkerThread)的top
  • t1.doInvoke直接执行,递归处理compute方法

FJ与普通线程池效果对别

public class Test {
    private static ExecutorService executorService = new ThreadPoolExecutor(20, 20, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());



    public static void main(String[] args) {
        int count = 0;
        int nums = 100;
        //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "0");
        ForkJoinPool.commonPool();
        for(int i = 0;i < nums;i++){
            long start = System.currentTimeMillis();
            testFJ(3000000);
//        testThreadPool(3000000);
            count += (System.currentTimeMillis() - start);
        }
        System.out.println(nums + "次,平均耗时:" + (count / nums) + "毫秒");
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
    }
    //测试forkjoin
    private static void testFJ(int size){
        List<Integer> ints = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            ints.add(i);
        }
        ForkJoinPool pool = ForkJoinPool.commonPool();
        ForkJoinTask<Long> submit = pool.submit(new InvokeInterfaceTask(ints));
        try {
            Long aLong = submit.get();
            System.out.println("count:" + aLong);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    //测试线程池
    private static void testThreadPool(int size){
        CountDownLatch latch = new CountDownLatch(size);
        AtomicLong count = new AtomicLong();
        for(int i = 0; i< size;i++){
            executorService.execute(()->{
                try{
                    TestTools.testMethod();
                }finally{
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
            // System.out.println("count:" + count);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

public class TestTools {
    public static long testMethod(){
//        try {
//            Thread.sleep(1);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        return 1L<<12 & 0xfff | 1 + 1;
    }

}


public class InvokeInterfaceTask extends RecursiveTask<Long> {
    private List<Integer> ints;
    public InvokeInterfaceTask(List<Integer> ints) {
        this.ints = ints;
    }

    @Override
    protected Long compute() {
//        System.out.println(Thread.currentThread().getName() + "," + ints.get(0) + "," + ints.get(ints.size() - 1));
        if (ints.size() <= 1) {
            long sum = 0;
            sum = TestTools.testMethod(enterpriseDictService);
            return sum;
        } else {
            InvokeInterfaceTask task1 = new InvokeInterfaceTask(ints.subList(0, ints.size() / 2));
            InvokeInterfaceTask task2 = new InvokeInterfaceTask(ints.subList(ints.size() / 2, ints.size()));
//            System.out.println(Thread.currentThread().getName() + "---invokeAll begin");
            invokeAll(task1, task2);
//            System.out.println(Thread.currentThread().getName() + "---invokeAll end");
            Long join = task1.join();
//            System.out.println(Thread.currentThread().getName() + "---task1.join");
            join += task2.join();
//            System.out.println(Thread.currentThread().getName() + "---task2.join");
            return join;
        }
    }
}

上述代码是测试在size数目(最细任务,因为FJ的任务肯定大于size,ThreadPool的任务=size)时,运行nums次,求平均耗时。
本机FJ的并发数为7
如下,random 10表示testMethod中使用了new Random.nextInt(10),随机休眠,模拟接口耗时

image.png
  • 1.把ThreadPool的core和max线程数设置为7,队列长度2000时,运行结果二者耗时基本相等
  • 2.把ThreadPool的core和max线程数设置为7,队列长度3000000时,二者并发线程数相等的情况下,FJ的耗时明显低于ThreadPool,即便增加ThreadPool的线程数,队列长度,耗时依然没有明显降低甚至增加。

总结

  • 1.FJ对于“很多”的小任务处理优势是明显的。
  • 2.FJ对于递归处理,执行结果进行join的支持友好,不必需要额外代码进行汇总结果,也无需特别注意并发问题
  • 3.FJ是对于一般线程池的补充,对于多线程情况多提供了一种解决方案的选择

普通线程池使用问题

http://javakk.com/188.html

参考

Java并发编程——ForkJoinPool之外部提交及worker执行过程
https://blog.51cto.com/u_14014612/6031659
JUC源码分析-线程池篇(五):ForkJoinPool - 2
https://www.jianshu.com/p/6a14d0b54b8d

扩展阅读

CAS原理解析
https://blog.csdn.net/yyqhwr/article/details/106965444
volatile
https://www.cnblogs.com/dolphin0520/p/3920373.html
wait/sleep
https://blog.csdn.net/qq_45731021/article/details/116502429
Java线程池实现原理及其在美团业务中的实践
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

ThreadLocalRandom.getProbe() 线程的探针哈希值

线程的探针哈希值说明

使用 ThreadLocalRandom.getProbe() 得到线程的探针哈希值,探针哈希值和 map 里使用的哈希值的区别是,当线程发生数组元素争用后,可以改变线程的探针哈希值,让线程去使用另一个数组元素,而 map 中 key 对象的哈希值,由于有定位 value 的需求,所以它是一定不能变的。

private static final long PROBE
        = U.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
//实际是获取的 Thread中的 threadLocalRandomProbe 属性
//可以看出这个属性是静态的,即类级别的,所有对象共用
static final int getProbe() {
        return U.getInt(Thread.currentThread(), PROBE);
    }

//更新线程探针哈希值
static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        U.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

守护线程

在Java中有两类线程:用户线程 (User Thread)、守护线程 (Daemon Thread)。
所谓守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。
将线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。在使用守护线程时需要注意一下几点:
(1) thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。
(2) 在Daemon线程中产生的新线程也是Daemon的。
(3) 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。

>> 是 有符号的 右移 操作符。
符号为正,高位插入 0
符号为负,高位插入 1
>>> 是 无符号的 右移 操作符。
不管符号为啥,高位插入0

负数左右移
https://blog.csdn.net/qq_41675265/article/details/126002069

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容