CyclicBarrier

1、CyclicBarrier使用场景:

先来描述一下它的使用场景:有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态,是不是这个场景的描述跟CountDownLatch很类似的,下面用一个简单的示例图来感受一下它们两者的区别:

CountDownLatch使用场景图
image.png
CyclicBarrier使用场景图
image.png

所有子线程都已经到达屏障之后,此时屏障就会消失,所有子线程继续执行,若存子线程尚未到达屏障,其他到达了屏障的线程都会进行等待

2、官方文档说明

它是一个同步的工具,能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用

3、关于CyclicBarrier的底层执行流程总结:

  • 1、初始化CyclicBarrier中的各种成员变量,包括parties、count以及Runnable(可选);

  • 2、当调用await()方法时,底层会先检查计数器是否已经归零,如果是的话,那么就首先执行可选的Runnable,接下来开始下一个generation;(注意:这里只是调用Runnable的run()方法,并不是调用start()方法开启另一个线程)

  • 3、在下一个分代中,将会重置count值为parties,并且创建新的Generation实例;

  • 4、同时会调用Condition的singalAll方法,唤醒所有在屏障前面等待的线程,让其开始继续执行;(注意:当有可选的Runnable时,是执行完run()方法中的汇总操作,其他线程才会继续执行)

  • 5、如果计数器没有归零,那么当前的调用线程将会通过Condition的await方法,在屏障前进行等待;

  • 6、以上所有执行流程均在lock锁的控制范围内,不会出现并发情况。

  • 7、在下一个分代时,该屏障又可以继续使用,例如计数器是3,线程1,线程2和线程3冲破了当前屏障后,下一个分代的屏障可以去给线程4,线程5和线程6使用,也可以又给线程1,线程2和线程3使用(自己总结的)

4、典型事例

1、当没有可选的Runnable时

当所有线程到达屏障时,不需要进行汇总,最后一个线程到达时,屏障消除,所有线程继续执行

image.png

package com.concurrency2;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class MyTest1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for(int i = 0;i < 3;i ++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 2000));

                    int randomInt = new Random().nextInt(500);
                    System.out.println("hello " + randomInt);

                    cyclicBarrier.await();

                    System.out.println("world " + randomInt);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出

hello 30
hello 471
hello 343
world 343
world 471
world 30
2、当有可选的Runnable时

当所有线程到达屏障时,需要进行汇总操作,等汇总操作进行完,屏障消除,所有线程继续执行

image.png

package com.concurrency2;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class MyTest1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("汇总1 ...");

            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("汇总2 ...");
        });
       //for(int u = 0, u < 2;u ++)//开两次屏障使用
        for(int i = 0;i < 3;i ++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 2000));

                    int randomInt = new Random().nextInt(500);
                    System.out.println("hello " + randomInt);

                    cyclicBarrier.await();

                    System.out.println("world " + randomInt);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

输出

hello 4
hello 229
hello 73
汇总1 ...
汇总2 ...
world 73
world 229
world 4

5、CyclicBarrier源代码分析

前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

1、重要成员变量
    / /可以理解为初始化时 需要阻塞的任务个数
    private final int parties;
    / /剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
    private int count;
 
    / /每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
    private final ReentrantLock lock = new ReentrantLock();
    / /用于阻塞和唤醒任务线程
   private final Condition trip = lock.newCondition();
 
    / /在所有线程被唤醒前,需要执行的一个Runable对应的run方法
    private final Runnable barrierCommand;
    / /用于表示“栅栏”当前的状态
    private Generation generation = new Generation();
2、构造方法

CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;    / /为了实现复用,进行备份
        this.count = parties;   / /初始化,待阻塞的任务总数
        this.barrierCommand = barrierAction;   / /初始化
    }
3、核心方法
await()方法有两层含义:

1、先检查前面是否已经有count个线程了,如果没有线程则会进入等待状态
2、当检测到屏障已经有count个线程了,则所有线程会冲出屏障继续执行(如果有Runnable参数的构造方法先执行汇总方法)

int index = --count;操作很明显不是原子性的,如果在多线程中不加lock肯定会出问题

private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
            //有一个线程线程被中断,整个CyclicBarrier将不可用
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count; //待等待的任务数减1
            if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//唤醒前先执行Runnable对象的run方法
                    ranAction = true;
                    nextGeneration();//重置整个CyclicBarrier,方便下次重用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//阻塞当前线程
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)//异常唤醒
                    throw new BrokenBarrierException();
 
                if (g != generation)//正常被唤醒,generation会被新建
                    return index;
 
                if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容