多线程并发编程20-线程同步之CyclicBarrier

    前文介绍了使用CountDownLatch来实现线程间同步,但是CountDownLatch的计数器是一次性的,当计数器值减到0之后再调用await或countdown方法就会立刻返回。今天讲解的CyclicBarrier是一种可重置的线程间同步,当指定个数的线程全部到达了一个状态后再全部同时执行,并重置CyclicBarrier。

    下面通过一个代码示例介绍CountDownLatch的使用,下面的示例中启动两个线程A、B,并在A、B线程都完成各自的任务之后,对A、B线程完成的任务数进行统计。

任务处理类MyRunnable 

class MyRunnable implements Runnable {

    private CyclicBarrier cyclicBarrier = null;

    private int threadId;

    private int taskCount;

    private LinkedBlockingQueue<Integer> taskSummaryQueue = null;

    public MyRunnable(CyclicBarrier cyclicBarrier, int threadId, int taskCount, LinkedBlockingQueue<Integer> taskSummaryQueue) {

        this.cyclicBarrier = cyclicBarrier;

        this.threadId = threadId;

        this.taskCount = taskCount;

        this.taskSummaryQueue = taskSummaryQueue;

    }

    @Override

    public void run() {

        String threadName = Thread.currentThread().getName();

        if (cyclicBarrier != null) {

            try {

//执行任务

                for (int i = 0; i < taskCount; i++) {

                    Thread.sleep(1000);

                }

                System.out.println("thread name:" + threadName + " thread id:" + threadId + "到达屏障处");

//到达屏障处,阻塞直到到达屏障处的线程个数等于CyclicBarrier 中的parties值。

                int await = cyclicBarrier.await();

                System.out.println("thread name:" + threadName + "thread:" + threadId + " await:" + await);

            } catch (InterruptedException e) {

                System.out.println("thread name:" + threadName + " thread:" + threadId + " InterruptedException");

                e.printStackTrace();

            } catch (BrokenBarrierException e) {

                System.out.println("thread name:" + threadName + " thread:" + threadId + " BrokenBarrierException");

                e.printStackTrace();

            }

        }

    }

}

main函数

public static void main(String[] args) {

    ThreadGroup group = new ThreadGroup("myThreadGroup");

//创建一个多线程安全的队列,用来统计完成的任务数。

    LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

//创建回旋屏障CyclicBarrier ,并指定barrierAction,barrierAction会在最后一个到达屏障处的线程执行。

    CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {

        @Override

        public void run() {

            String threadName = Thread.currentThread().getName();

            int finishedTask = 0;

//汇总完成任务数

            for (Integer integer : queue) {

                if (integer != null) {

                    finishedTask += integer;

                }

            }

            System.out.println("barrierAction in thread name:" + threadName + " finished task:" + finishedTask);

        }

    });

    Thread thread1 = new Thread(group,new MyRunnable(cyclicBarrier, 1,5,queue), "thread1");

    thread1.start();

    try {

        Thread.sleep(10);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

    Thread thread2 = new Thread(group, new MyRunnable(cyclicBarrier, 2,10,queue), "thread2");

    thread2.start();

    try {

        Thread.sleep(10);

    } catch (InterruptedException e) {

        e.printStackTrace();

    }

}

执行结果:

thread name:thread1 thread id:1到达屏障处

thread name:thread2 thread id:2到达屏障处

barrierAction in thread name:thread2 finished task:15

thread name:thread2thread:2 屏障之后

thread name:thread1thread:1 屏障之后

    下面对CyclicBarrier内部原理进行剖析。

CyclicBarrier内部的成员变量如下:

//独占锁,用来操作计数器。

private final ReentrantLock lock =new ReentrantLock();

//条件变量,阻塞到达屏障处的线程,直到计数器为0。

private final Condition trip =lock.newCondition();

//重置之后会将计数器值重置为该值。

private final int parties;

//当计数器值为0之后会触发该runnable。

private final Runnable barrierCommand;

//年代变量,当重置、计数器减到0或抛出异常,年代变量会重新创建进入下一个年代。

private Generation generation =new Generation();

//计数器,当线程调用awit到达屏障点时会进行减一。

private int count;

int await()

    调用await方法则表示调用线程到达屏障处,阻塞直到parties个线程到达屏障处。如果当前线程不是最后一个到达屏障处的线程则会进行阻塞,当有如下情况发生当前线程才会返回:

1.最后一个线程到达屏障处。

2.其他线程调用了当前线程的中断标志。

3.其他到达屏障处的线程被设置了中断标志。

4.其他到达屏障处阻塞的线程到了阻塞过期时间。

5.其他线程调用了CyclicBarrier的rest方法。

public int await() throws InterruptedException, BrokenBarrierException {

    try {

        return dowait(false, 0L);

    } catch (TimeoutException toe) {

        throw new Error(toe); // cannot happen

    }

}

int dowait(boolean timed, long nanos)

private int dowait(boolean timed, long nanos)

    throws InterruptedException, BrokenBarrierException,

          TimeoutException {

//(1)获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        final Generation g = generation;

        if (g.broken)

            throw new BrokenBarrierException();

        if (Thread.interrupted()) {

            breakBarrier();

            throw new InterruptedException();

        }

//(2)计数器减1。

        int index = --count;

//(3)如果计数器为0,则说明最后一个线程到达了屏障处,则调用barrierAction后唤醒之前到达屏障处阻塞的线程。

        if (index == 0) {  // tripped

            boolean ranAction = false;

            try {

//(4)调用barrierAction对应的runnable。

                final Runnable command = barrierCommand;

                if (command != null)

                    command.run();

                ranAction = true;

//(5)唤醒之前到达屏障处阻塞的线程,并重置计数器和年代。为什么先唤醒在重置呢?因为即使调用了条件变量的signalAll,但是当前线程还没有释放独占锁,其他阻塞的线程也不能被实际唤醒。所以重置操作放在signalAll之后没有问题。

                nextGeneration();

                return 0;

            } finally {

                if (!ranAction)

                    breakBarrier();

            }

        }

        // (6)循环直到最后一个线程到达屏障处、CyclicBarrier被broker、当前线程被中断或阻塞超时。

        for (;;) {

            try {

//(7)调用条件变量的await方法进行阻塞。

                if (!timed)

                    trip.await();

                else if (nanos > 0L)

                    nanos = trip.awaitNanos(nanos);

            } catch (InterruptedException ie) {

//(8)如果当前线程被其他线程设置了中断标志,则将CyclicBarrier设置为broker并唤醒其他在屏障点阻塞的线程,这些线程会抛出BrokenBarrierException异常。

                if (g == generation && ! g.broken) {

                    breakBarrier();

                    throw ie;

                } else {

                    // We're about to finish waiting even if we had not

                    // been interrupted, so this interrupt is deemed to

                    // "belong" to subsequent execution.

                    Thread.currentThread().interrupt();

                }

            }

            if (g.broken)

                throw new BrokenBarrierException();

//(9)如果年代不一致则正常返回。

            if (g != generation)

                return index;

//(10)当前线程阻塞超时了则抛出TimeoutException异常,将CyclicBarrier设置为broker并唤醒其他在屏障点阻塞的线程,这些线程会抛出BrokenBarrierException异常。

            if (timed && nanos <= 0L) {

                breakBarrier();

                throw new TimeoutException();

            }

        }

    } finally {

//(11)释放锁

        lock.unlock();

    }

}

void nextGeneration()

    唤醒所有在屏障点出阻塞的线程,并重置CyclicBarrier,只有在获取独占锁的情况下才能调用此方法。

private void nextGeneration() {

    // signal completion of last generation

    trip.signalAll();

    // set up next generation

    count = parties;

    generation = new Generation();

}

void breakBarrier() 

    设置CyclicBarrier为broken状态,重置计数器并唤醒阻塞在屏障点的线程,这些线程会抛出BrokenBarrierException异常。

private void breakBarrier() {

    generation.broken = true;

    count = parties;

    trip.signalAll();

}

void reset() 

    重置CyclicBarrier。

public void reset() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        breakBarrier();  // break the current generation

        nextGeneration(); // start a new generation

    } finally {

        lock.unlock();

    }

}

    CyclicBarrier通过独占锁+条件变量+计数器+年代变量来实现可重复使用的线程间同步器。

    今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容