12.Fork_Join框架

  1. 用来做什么
    ForkJoinPool是ExecutorService(线程池服务)接口的实现,它专为可以递归分解成小块的工作而设计。
    for/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。
    使用fork/join框架的第一步是编写执行一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
    直接做这项工作
其他
    把当前工作分成两部分
    调用这两个部分并等待结果

将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或者RecursiveAction(不可以返回结果)

ForkJoinTask是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计
,将ForkJoinTask的执行相关的代码进行隐藏,通过提供抽象类暴露用户的实际业务处理。

  1. 意图梳理
    关键点:分解任务fork出新任务,汇集join任务执行结果


    1.png

ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoin的框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。

  1. 工作窃取


    2.png

说明:所谓工作窃取区别于传统线程池,是因为,虽然也是多线程工作,但是线程池是自己线程干自己的事情,干完了就休息,但是ForkJoin的工作窃取是当自己线程任务队列为空之后,则取其他任务队列取任务帮助完成,所以更加充分的利用CPU,性能更高。

  1. 实现思路
  • 每个worker线程都维护一个任务队列,即ForkJoinWorkerThread的任务队列
  • 任务队列是双向队列,这样可以同时实现LIFO和FIFO
  • 子任务会被加入到原先任务所在Worker线程的任务队列
  • Worker线程用LIFO的方法取出任务,后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)
  • 当任务队列为空,会随机从其他的worker的队列中拿走一个任务执行(工作窃取:steal work)
  • 如果一个worker线程遇到了join操作,而这适合正在处理其他任务,会等到这个任务结束。否则直接返回
  • 如果一个worker线程窃取任务失败,它会用yield或者sleep之类的方法休息一会,在尝试(如果所有线程都是空闲状态,即没有任务运行,那么该县城也会进入阻塞状态等待新任务的到来)
  • 重要:forkjoin不做具体任务拆分也不知道怎么拆分只是提供了功能,真实逻辑都在compute内部,自己实现
  1. 适用
  • 使用尽可能少的线程池-在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池如果不需要特定的调整,则使用默认的公共线程池
  • 使用合理的阙将ForkJoinTask拆分为子任务
  • 避免在ForkJoinTask中出现任何阻塞
  • 适合数据处理、结果汇总、统计等场景
    • java8实例:java.util.Arrays类用于其parallelSort()方法
  • 适合于内存操作,数据计算等,但是明显不适合文件操作,网络操作
  1. 基本使用
    使用ForkJoin框架,需要创建一个ForkJoin的任务,而ForkJoinTask是一个抽象类,我们不需要去继承ForkJoinTask进行使用。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。
private static class SumTask extends RecursiveTask<Integer> {

        private  int threshold ;
        private static final int segmentation = 10;

        private int[] src;

        private int fromIndex;
        private int toIndex;

        public SumTask(int formIndex,int toIndex,int[] src){
            this.fromIndex = formIndex;
            this.toIndex = toIndex;
            this.src = src;
            this.threshold = src.length/segmentation;
        }

        @Override
        protected Integer compute() {
            //核心就是该方法

            //可知forkjoin只提供具体抽象,但是实际任务怎么拆分还是
            //看具体用户怎么书写此处的代码

            //此处大意就是if内部的就是具体执行,else则是继续拆分任务,不做具体执行,实际拆分之后最终都会进入if内部
            if((toIndex - fromIndex)<threshold ){
                int count = 0;
                System.out.println(" from index = "+fromIndex
                        +" toIndex="+toIndex);
                for(int i = fromIndex;i<=toIndex;i++){
                  count+=src[i];
                }
                return count;
            }else{
                int mid = (fromIndex+toIndex)/2;
                SumTask left =  new SumTask(fromIndex,mid,src);
                SumTask right = new SumTask(mid+1,toIndex,src);
                invokeAll(left,right);
                return left.join()+right.join();
            }
        }
    }

使用ForkJoinPool进行执行
task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,
进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。

 public static void main(String[] args) {
            int[]  array = MakeArray.createIntArray();
        ForkJoinPool forkJoinPool= new ForkJoinPool();
        SumTask sumTask  = new SumTask(0,array.length-1,array);

        long start = System.currentTimeMillis();

        forkJoinPool.invoke(sumTask);
        System.out.println("The count is "+sumTask.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }
  1. Future
    Future表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成以及获取结果的方法

Future的类图结构


3.png

如上图可知,ForkJoin框架以及Future都是属于Future的子类或者抽象类继承类

  • Callable
    和runable一样的业务定义,但是本质上是有区别的:有返回值,可抛异常,同时call是运行在run里面的
public class CallDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService service= Executors.newCachedThreadPool();

        Callable<String> callable=new Callable<String>() {
            @Override
            public String call() throws Exception {
                //实际是运行在runable的run方法里面,所以也是多线程
                return null;
            }
        };
        //方式一:通过线程池-其实还是多线程运行,会发现内部就是FutureTask
//        service.submit(callable);

        //方式二:通过FutureTask包装callable
        FutureTask<String> task=new FutureTask<String>(callable);
        new Thread(task).start();
       String a= task.get();//通过这种方式获取callable内部call的返回值
//        System.out.println(a);null

    }
}

之前如果想获取异步结果返回值,则需要手动通过countDownLatch实现,但是有了FutureTask则可以很方便的获取

  1. FutureTask应用

如上面的例子,FutureTask是为了更加简单的获取异步返回信息,并提供多线程或者线程池的操作,从而进行多个异步操作同时进行,而结果只取最长的任务那个,同时提供get获取返回值


4.png

总的执行时间,取决于执行最慢的逻辑
逻辑之间无依赖关系,可同时执行,则可以应用多线程技术进行优化

  1. 自定义实现简单版本的FutureTask
public class TonyFutureTask<T> implements Runnable, Future {
    Callable<T> callable;//业务逻辑在callable里面
    T result=null;
    volatile String state="NEW";//task执行状态
    LinkedBlockingQueue<Thread> waiters=new LinkedBlockingQueue();

    public TonyFutureTask(Callable<T> callable) {
        this.callable=callable;
    }
    @Override
    public void run() {
        try {
            //从这个里可知,call确实是在run方法内部执行的
            result=callable.call();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            state="END";
        }
        //唤醒等待者
        Thread waiter=waiters.poll();
        while (waiter != null) {
            LockSupport.unpark(waiter);
            //继续取出队列中的等待者
            waiter=waiters.poll();
        }
    }
    @Override
    public T get() {
        if ("END".equals(state)) {
            return result;
        }
        waiters.offer(Thread.currentThread());//加入到等待队列,线程不继续往下执行了
        while (!"END".equals(state)) {
            //阻塞
            LockSupport.park();
        }
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }
    @Override
    public boolean isCancelled() {
        return false;
    }
    @Override
    public boolean isDone() {
        return false;
    }
    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}

上面使用FutureTask的例子都可以使用这个自定义的task从而达到相同目的,实际上jdk内部的FutureTask也是这么实现的,只不过更具体更细节

  1. 线程安全级别
  • 不可变的--这个类的实例是不可变的。这样的例子有String,Long,BigInterger
  • 无条件的线程安全-- 这个类的实例是不可变的,但是这个类有足够的内部同步。例子:Random,ConcurrentHashMap,一般队列都是线程安全的,否则先进先出根本无法保障
  • 有条件的线程安全-- 除了有些方法为进行安全的并发使用而需要外部同步之外,这种线程安全级别与无条件安全相同。例子包含:Collections.synchronized包装返回的集合,它们的迭代器是要求外部同步的。
  • 非线程安全--这个类的实例是可变的。为了并发使用他们,客户必须利用自己选择的外部同步包围每个方法调用。例如:ArrayList
  • 线程对立的--这个类不能被安全的被多个线程使用,即使所有的方法调用都被外围同步包围。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容

  • From:Java并发编程的艺术 目录BiBi - 并发编程 -0- 开篇BiBi - 并发编程 -1- 挑战Bi...
    奋飞的蜗牛ing阅读 390评论 0 0
  • 在这篇文章中,将覆盖如下内容: 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的设计 Recu...
    打铁大师阅读 742评论 0 2
  • 1、核心思想 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架, 核心思想就是把大任务分割成若...
    冰河winner阅读 532评论 0 0
  • 文/馨 小时候,一直期待着自己长大后的样子,渴望体验长大后的感觉。可当我们渐渐地长大之后,却开始很想逃避。我们开始...
    婷蒽蒽阅读 255评论 0 2
  • 爷爷奶奶那个年代 没有现在的饭菜 遭受饥寒迫害 劳动从不缺钙 十六七岁的年纪留下大人回忆 父亲渐渐老去我要将他代替...
    木君94阅读 385评论 1 3