02 线程的并发工具类

1 Fork-Join

1.1 Fork/Join 体现了“分而治之”

什么是分而治之?
规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解


image.png

1.2 工作秘取(WorkStealing)

image.png

有A、B两个线程,A线程任务已经做完,而B线程任务还没有做完,这时A线程可以从B线程所属的任务中偷取任务来做,这种工作方式可提升资源利用率,提高性能。

1.3 Fork/Join实战

1.3.1 使用的范式

image.png

其中RecursiveTask、RecursiveAction、ForkJoinTask都为抽象类,我们自己的任务类要根据业务需要继承并实现这些类,并必须实现compute()方法
那么这些类有什么不同呢?

  • ForkJoinTask:实现接口 public abstract class ForkJoinTask<V> implements Future<V>, Serializable
  • RecursiveTask : 任务有返回值
    继承public abstract class RecursiveTask<V> extends ForkJoinTask<V>
  • RecursiveAction:任务没有返回值
    继承 public abstract class RecursiveAction extends ForkJoinTask<Void>

compute()方法逻辑

1.3.2 实战案例

1.3.2.1 Fork/Join的同步用法同时演示返回结果值:统计整形数组中所有元素的和

任务类

    private static class SumTask extends RecursiveTask<Integer>{

        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
        private int[] src; //表示我们要实际统计的数组
        private int fromIndex;//开始统计的下标
        private int toIndex;//统计到哪里结束的下标

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            if(toIndex-fromIndex < THRESHOLD) {//若计算的范围小于指定阈值,则直接进行业务累加
                int count = 0;
                for(int i=fromIndex;i<=toIndex;i++) {
                    //SleepTools.ms(1);
                    count = count + src[i];
                }
                return count;
            }else {//若计算的范围大于指定阈值,则将该范围拆分,并分别分配任务,采用递归的方式继续判断求和
                //fromIndex....mid....toIndex
                //1...................70....100
                int mid = (fromIndex+toIndex)/2;
                SumTask left = new SumTask(src,fromIndex,mid);
                SumTask right = new SumTask(src,mid+1,toIndex);
                invokeAll(left,right);
                return left.join()+right.join();
            }
        }
    }

调用类

ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();

        SumTask innerFind = new SumTask(src,0,src.length-1);

        long start = System.currentTimeMillis();

        pool.invoke(innerFind);//同步调用
        System.out.println("Task is Running.....");

        System.out.println("The count is "+innerFind.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

1.3.2.2 Fork/Join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件

任务类

public class FindDirsFiles extends RecursiveAction{

    private File path;//当前任务需要搜寻的目录

    public FindDirsFiles(File path) {
        this.path = path;
    }

    @Override
    protected void compute() {
        
        List<FindDirsFiles> subTasks = new ArrayList<>();
        
        File[] files = path.listFiles();
        if(files!=null) {
            for(File file:files) {
                if(file.isDirectory()) {//如果是文件夹,则继续调用任务,递归处理
                    subTasks.add(new FindDirsFiles(file));
                }else {//如果是文件,则直接进行业务逻辑
                    //遇到文件,检查
                    if(file.getAbsolutePath().endsWith("txt")) {
                        System.out.println("文件:"+file.getAbsolutePath());
                    }
                }
            }
            if(!subTasks.isEmpty()) {//如果当前任务中包含子任务,则调用子任务join()方法,等待子任务结束当前任务方可继续执行
                for(FindDirsFiles subTask:invokeAll(subTasks)) {
                    subTask.join();//等待子任务执行完成
                }
            }
        }


        
    }
}

调用类

  public static void main(String [] args){
        try {
            // 用一个 ForkJoinPool 实例调度总任务
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            pool.execute(task);//异步调用

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork="+otherWork);
            task.join();//阻塞的方法,保证主线程等待我们的异步任务线程,防止主线程提前结束
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2 常用的并发工具类

2.1 CountDownLatch

CountDownLatch是一个非常实用的多线程控制工具类。常用的就下面几个方法:

  • CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数
  • countDown() // 计数减一
  • await() //等待,当计数减到0时,所有线程并行执行

2.1.1 作用

是一组线程等待其他的线程完成工作以后在执行,加强版join

2.1.2 应用场景

2.1.3 实战

public class CountDownLatchDemo implements Runnable{

    static final CountDownLatch latch = new CountDownLatch(10);//构造方法传入计数器的值
    static final CountDownLatchDemo demo = new CountDownLatchDemo();

    @Override
    public void run() {
        // 模拟业务逻辑,业务执行完调用latch.countDown();
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("check complete");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //计数减一
            //放在finally避免任务执行过程出现异常,导致countDown()不能被执行
            latch.countDown();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);//创建固定大小的线程池
        for (int i=0; i<10; i++){
            exec.submit(demo);
        }

        // 等待10个业务扣减完毕
        latch.await();

        // 主线程继续执行
        System.out.println("扣减完毕,主线程可继续执行");
        // 关闭线程池
        exec.shutdown();
    }
}

2.2 CyclicBarrier

2.2.1 作用

让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行
CyclicBarrier(可重用屏障/栅栏) 类似于 [CountDownLatch](倒计数闭锁),它能阻塞一组线程直到某个事件的发生。
* 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
* 闭锁用于等待事件,而屏障用于等待其他线程。
* CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await() 方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
* 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
* CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
* 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
* 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。

注:CyclicBarrier在本文只做简单理解,有一些借鉴自这位博主的文章,CyclicBarrier他的这篇文章讲解比较全面https://www.jianshu.com/p/9262361a1200

2.2.2 应用场景

CyclicBarrier 常用于多线程分组计算。
比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。

2.2.3 CyclicBarrier 方法说明

CyclicBarrier(parties) 方法

  • 初始化相互等待的线程数量的构造方法。

CyclicBarrier(parties,Runnable barrierAction) 方法

  • 初始化相互等待的线程数量以及屏障线程的构造方法。
  • 屏障线程的运行时机:等待的线程数量 =parties 之后,CyclicBarrier 打开屏障之前。

例如在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。

getParties 方法

  • 获取 CyclicBarrier 打开屏障的线程数量。

getNumberWaiting 方法

  • 获取正在 CyclicBarrier 上等待的线程数量。

await 方法

在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。

  • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
  • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
  • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 线程调用 await() 表示自己已经到达栅栏。
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
  • await() 时被中断或者超时。

await(timeout,TimeUnit) 方法

在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。

  • 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
  • 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
  • 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
  • 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
  • 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。

isBroken 方法

获取是否破损标志位 broken 的值,此值有以下几种情况。

  • CyclicBarrier 初始化时,broken=false,表示屏障未破损。
  • 如果正在等待的线程被中断,则 broken=true,表示屏障破损。
  • 如果正在等待的线程超时,则 broken=true,表示屏障破损。
  • 如果有线程调用 CyclicBarrier.reset() 方法,则 broken=false,表示屏障回到未破损状态。

reset 方法

使 CyclicBarrier 回归初始状态,它做了两件事。

  • 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
    将是否破损标志位 broken 置为 false。

2.2.3 实战

public class CyclicBarrierDemo {
    // 自定义工作线程
    private static class Cyclic extends Thread {
        private CyclicBarrier cyclicBarrier;
        
        public Cyclic(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        
        @Override
        public void run() {
            super.run();
            
            try {
                
                cyclicBarrier.await();//调用等待方法,等待其他线程到达屏障
                System.out.println(Thread.currentThread().getName() + "等待完毕,开始执行");
                //  模拟业务
                Thread.sleep(5000);
                System.out.println(Thread.currentThread().getName() + "执行完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        
        for (int i = 0; i < threadCount; i++) {
            System.out.println("创建工作线程" + i);
            Cyclic cyclic = new Cyclic(cyclicBarrier);
            cyclic.start();
        }
    }
}

2.3 CountDownLatch和CyclicBarrier辨析

  • countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
  • countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数
  • CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
  • CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置;
  • CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。

2.4 Semaphore

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。
Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数,而对于每辆车来说就如同一个线程,线程需要通过acquire()方法获取许可,而release()释放许可。如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可,从而使得多线程能够合理的运行。

2.4.1 作用

Semaphore主要方法:

Semaphore(int permits):
构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。

Semaphore(int permits,boolean fair):
构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

void acquire():
从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。

void acquire(int n):
从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。

void release():
释放一个许可,将其返回给信号量。就如同车开走返回一个车位。

void release(int n):
释放n个许可。

int availablePermits():
当前可用的许可数。

getQueueLength():
等待线程队列长度

2.4.2 应用场景

2.4.3 实战

public class SemaphoreDemo {
    private static final Semaphore semaphore=new Semaphore(3);
    private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
    
    private static class InformationThread extends Thread{
        private final String name;
        private final int age;
        public InformationThread(String name,int age)
        {
            this.name=name;
            this.age=age;
        }
        
        public void run()
        {
            try
            {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis());
                System.out.println("当前可使用的许可数为:"+semaphore.availablePermits());
                semaphore.release();
                
            }
            catch(InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args)
    {
        String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"};
        int[] age= {26,27,33,45,19,23,41};
        for(int i=0;i<7;i++)
        {
            Thread t1=new InformationThread(name[i],age[i]);
            threadPool.execute(t1);
        }
    }
 

2.5 Exchange

2.5.1 作用

Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。其定义为 Exchanger<V> 泛型类型,其中 V 表示可交换的数据类型,对外提供的接口很简单,具体如下:

Exchanger():无参构造方法。
V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。

可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。

2.5.2 应用场景

2.5.3 实战

public class UseExchange {
    private static final Exchanger<String> exchange 
        = new Exchanger<String>();

    public static void main(String[] args) {
        System.out.println("ssssssssssss");
        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                String dataA = "111";
                try {
                    /*添加数据
                     * set.add(.....)
                     * */
                    System.out.println("数据交换前:"+dataA);
                    dataA = exchange.exchange(dataA);//交换data
                    System.out.println("数据交换前:"+dataA);
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                String dataB = "222";
                try {
                    /*添加数据
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    System.out.println("数据交换前:"+dataB);
                    dataB = exchange.exchange(dataB);//交换data
                    System.out.println("数据交换前:"+dataB);
                    /*处理交换后的数据*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

    }
}

2.6 Callable、Future和FutureTask

2.6.1 继承结构图

image.png

2.6.2 Future接口常用方法

V get();阻塞方法,获取Callable的返回值
V get(long,Timeunit);设定超时时间和超时操作
isDone(),结束,正常还是异常结束,或者自己取消,返回true;
isCancelled() 任务完成前被取消,返回true;
cancel(boolean):

  • 任务还没开始,返回false
  • 任务已经启动,cancel(true),中断正在运行的任务,中断成功,返回true,cancel(false),不会去中断已经运行的任务
  • 任务已经结束,返回false

2.6.3 实战案例

public class UseFuture {
    
    /*实现Callable接口,允许有返回值*/
    private static class UseCallable implements Callable<Integer>{

        private int sum;
        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for(int i=0;i<5000;i++) {
                sum = sum+i;
            }
            System.out.println("Callable子线程计算完成,结果="+sum);
            return sum;
        }

    }
    
    public static void main(String[] args) 
            throws InterruptedException, ExecutionException {
        
        UseCallable useCallable = new UseCallable();//创建自定义的Callable
        FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);//新建FutureTask
        new Thread(futureTask).start();//将FutureTask通过线程启动
        Random r = new Random();
        SleepTools.second(1);
        if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
            System.out.println("Get UseCallable result = "+futureTask.get());//阻塞获取Callable中call()方法的返回值
        }else {
            System.out.println("中断计算");
            futureTask.cancel(true);
        }
        
    }

}

2.6.4 应用场景

包含图片和文字的文档的处理:图片(云上),可以用future去取图片,主线程继续解析文字。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容