java源码 - CyclicBarrier

开篇

  • CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。

  • CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

  • CyclicBarrier的内部实现逻辑基于ReentrantLock实现,可以理解为ReentrantLock的上层应用者,通过ReentrantLock的Condtion实现线程的休眠和唤醒。


CyclicBarrier用法demo

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}
线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...


CyclicBarrier类定义

  • parties记录一共等待执行个数,count记录依然等待执行的个数。
  • barrierCommand记录所有待执行的完成后由最后一个线程执行的完成的命令。
  • Generation的代的概念来实现CyclicBarrier的复用。
  • 构造函数负责初始化parties、count、barrierCommand的核心变量。
public class CyclicBarrier {
    // 代的类定义
    private static class Generation {
        boolean broken = false;
    }

    // 内部通过ReentrantLock实现线程安全的等待
    private final ReentrantLock lock = new ReentrantLock();
    // 内部通过Lock的condition实现所有waiter的信号通知
    private final Condition trip = lock.newCondition();
    // 所有等待执行的个数
    private final int parties;
    // 所有等待线程都完成任务后由最后一个线程执行的命令
    private final Runnable barrierCommand;
   
    // 通过代的概念实现复用
    private Generation generation = new Generation();

    // 还在等待的个数
    private int count;

    // 核心构造函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }


CyclicBarrier工作原理

  • CyclicBarrier通过ReentrantLock来保证线程休眠和唤醒的通信。
  • 在执行过程中会对等待计数进行减一操作,值不为0当前线程进入休眠等待其他线程唤醒
  • 在执行过程中会对等待计数进行减一操作,值为0当前线程直接执行barrierCommand并且通过nextGeneration方法唤醒其他等待线程
  • 线程的休眠和唤醒都是基于ReentrantLock来实现的
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 通过lock来保证线程安全
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            // 判断generation过期的情况
            if (g.broken)
                throw new BrokenBarrierException();
            // 判断线程中断情况
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 递减待执行的个数计数
            int index = --count;
            // 所有待执行任务完成后执行barrierCommand命令
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // barrierCommand命令不为null的时候执行该命令
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    // 已经执行了barrierCommand
                    ranAction = true;
                    // 重置generation用以复用并且唤醒所有等待的线程
                    //       private void nextGeneration() {
                    //           trip.signalAll();
                    //           count = parties;
                    //           generation = new Generation();
                    //        }
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 如果count的值不为0,那么当前线程就开始进入等待
            // 外层通过lock占用锁,内层通过wait()进入休眠并释放锁
            for (;;) {
                try {
                    if (!timed)
                        // private final Condition trip = lock.newCondition();
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                
                 // 各种后置处理逻辑
                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }


参考文章

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容