[怀旧并发08]分析同步工具Semaphore和CyclicBarrier的实现原理

Java并发编程源码分析系列:

前两篇分别通过ReentrantLock和CountDownLatch分析了AQS的独占功能和共享功能。除CountDownLatch之外,还有Semaphore和CyclicBarrier,前者类似CountDownLatch使用AQS实现,后者使用ReentrantLock实现,本文分析一下。

信号量Semaphore

直接用个小例子描述信号量的用法:

final Semaphore semaphore = new Semaphore(2);
for (int studentIndex = 0; studentIndex < 5; studentIndex++) {
    final int finalI = studentIndex;
    new Thread(() -> {
        try {
            semaphore.acquire(); //如果没有许可,阻塞
            System.out.println("student " + finalI + " read book");
            Thread.sleep(new Random().nextInt(10000) + 1000);
            System.out.println("student " + finalI + " finish read book");
            semaphore.release(); //释放许可
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

5名学生阅读2本书,因为书的数量是固定的,所以只有2名学生可以持有书阅读,其他人需要等别人看完再看。程序执行结果如下:

student 0 read book
student 1 read book
student 1 finish read book
student 2 read book
student 0 finish read book
student 3 read book
student 2 finish read book
student 4 read book
student 3 finish read book
student 4 finish read book

信号量控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。

信号量有许可的概念,初始许可数量在构造Semaphore时确定。许可还有剩余时,通过acquire获取许可;操作完成后,通过release释放许可。如果没有许可,acquire将阻塞直到有许可。

Semaphore使用起来非常简单,下面来看源码。

Semaphore构造函数
public Semaphore(int permits) {
     sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

和ReentrantLock类似,Semaphore也分为公平和非公平两种。permits是许可数量,最终传递给state,此时state的含义就是许可的数量。

获取许可
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

Semaphore的acquire调用的是acquireSharedInterruptibly,和CountDownLatch的await一样,差异的地方是tryAcquireShared方法是由子类实现。

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

公平信号量和非公平信号量分别调用tryAcquireShared和nonfairTryAcquireShared,目的都是将state减一,以此尝试获取许可。不同之处是公平信号量多调用了hasQueuedPredecessors,判断是否有比自己先的线程在等待;非公平信号量就不管排队,先试了再说。

释放许可

对于release操作,还是和CountDownloadLatch一样调用releaseShared。到这里,应该对AQS的使用非常了解,直接来看tryReleaseShared。

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

release是acquire的反操作,将许可加一。

总结下来,Semaphore和CountDownLatch都是依赖AQS实现,两者只有少少不同。

栅栏CyclicBarrier

final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("all threads through barrier"));

for (int i = 0; i < 5; i++) {
    final int finalI = i + 1;
    new Thread(() -> {
        System.out.println("thread " + finalI + " is started");
        Random random = new Random();
        try {
            Thread.sleep(random.nextInt(10000) + 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("thread " + finalI + " has been completed");

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println("thread " + finalI + " to continue");
    }).start();
}

CyclicBarrier设立一个barrier,只有当所有线程到达barrier的位置,才能继续执行下去。打个简单比喻,只有当所有人到达指定地方集中,才能开车出发。

thread 1 is started
thread 4 is started
thread 3 is started
thread 5 is started
thread 2 is started
thread 2 has been completed
thread 3 has been completed
thread 5 has been completed
thread 4 has been completed
thread 1 has been completed
all threads through barrier
CyclicBarrier的构造函数
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
  • parties表示barrier开启需要到达的线程数量;
  • count表示等待到达barrier的线程数量;
  • barrierCommand表示开启barrier后执行的操作;

CyclicBarrier的构造函数设置了参数,没有什么特别,下面来看核心方法await。await分为限时和不限时两个版本,最终都调用了dowait。在开始研究dowait代码之前,先要了解几点。

线程什么时候通过Barrier?

当三种条件之一发生时,线程才能继续执行:

  • 当parties个线程到达barrier;
  • 当前线程被中断,抛出InterruptedException;
  • 超时,抛出BrokenBarrierException。
显式条件Condition

dowait的实现使用了ReentrantLock和Condition。ReentrantLock前文介绍过,它是synchronized的高级版本。Object的wait、notify、notifyAll大家都知道,它和synchronized打包实现“条件不满足时,线程等待;条件满足时,等待该条件的线程被唤醒”的功能。Condition是它的高级版本,和ReentrantLock一起可以更加灵活精细地实现这种功能。Condition不是本文的重点,后续会开新篇讲解,现在先当成wait和notify理解。

Generation对象
private Generation generation = new Generation();

private static class Generation {
    boolean broken = false;
}

private void nextGeneration() {
   // signal completion of last generation
   trip.signalAll();
   // set up next generation
   count = parties;
   generation = new Generation();
}

Generation是一代的意思,唯一记录了barrier是否broken。看CyclicBarrier的名字也知道,它是可重复使用的,每次使用CyclicBarrier,本次所有线程同属于一代,即同一个Generation。当parties个线程到达barrier时,需要调用nextGeneration更新换代。

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

barrier被broken后,调用breakBarrier方法,将generation.broken设置为true,并使用signalAll通知所有等待的线程。

dowait

现在正式看dowait的代码,代码有点长,但有了前面的铺垫,看起来就简单多了。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        //1
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //2
        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        //3
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

dowait一开始就获取ReentrantLock并锁定,然后在标记1,判断Generation是否broken,再判断线程是否被中断。

标记2,将等待的线程数量减1,如果count正好为零,说明parties个线程到达barrier了。执行预定的Runnable任务后,更新换代,准备下一次使用。

标记3的for循环里,通过标记timed,根据是否限时,线程调用await或者awaitNanos进入等待。其他代码很好理解,就不多说了。

后记

理解AQS之后,研究Semaphore易如反掌,CyclicBarrier也没什么难的,只有一个新东西Condition,下一篇研究。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容