CyclicBarrier 原理解析

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

image.png

原理如下

使用一个 ReentrantLock 来获取一个 Condition, 用于存储将需要再屏障点到达之前阻塞的线程。当公共屏障点被触发时,则唤醒队列中所有线程。

CyclicBarrier 构造函数

由代码可以知道,初始化的时候需要指定需要阻塞的线程数量,同时也可以指定一个在唤醒等待队列之前执行的线程。

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) 
        throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

await方法

调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法:

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

await 本身没什么逻辑,仅仅是调用 dowait 方法。

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    // 抢锁
    lock.lock();
    try {
        // 获取 Generation
        final Generation g = generation;

        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 当前线程被中断过,
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }

        // 线程数减 1
        int index = --count;
        // index = 0 表示公共屏障点被触发
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 执行栅栏任务
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 开始执行
                ranAction = true;
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                // 没有时间限制,则无期限等待
                if (!timed)
                    trip.await();
                // 有时间限制,则等待一定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                   // 上面条件不满足,说明这个线程不是这代的
                   // 就不会影响当前这代栅栏的执行,所以,就打个中断标记 Thread.currentThread().interrupt();
                }
            }

            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;

            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;

CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

用例

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrierTest test = new CyclicBarrierTest();
        test.test();
    }

    private void test(){
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("都闪开,我要开始装逼了!!");
            try {
                TimeUnit.SECONDS.sleep(5);
            }catch (Exception e){
                e.printStackTrace();
            }
        });
        for (int i = 0; i < 5; i++){
            Worker worker = new Worker(i, barrier);
            Thread t = new Thread(worker);
            t.start();
            try {
                System.out.println(worker.toString());
                TimeUnit.SECONDS.sleep(1);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private class Worker implements Runnable{
        private int idx;

        CyclicBarrier barrier;

        public Worker(int idx, CyclicBarrier barrier) {
            this.idx = idx;
            this.barrier = barrier;
        }

        @Override
        public String toString() {
            return "Worker [" + idx + "] 准备就绪!!!";
        }

        @Override
        public void run() {
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

            for (int i = 5; i > 0; i--){
                try {
                    System.out.println(System.currentTimeMillis() + ", 线程=" + idx + ", 批次=" + i);
                    TimeUnit.SECONDS.sleep(1);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。