并发十二:CountDownLatch、CyclicBarrier、Semaphore实现分析

J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。

CountDownLatch

一个小栗子:

public class CountDownLatchTest {
    public static void main(String[] args) {
        final CountDownLatch latch = new CountDownLatch(2);
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ":二级表生成");
                    Thread.sleep(10000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ":二级表生成");
                    Thread.sleep(10000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println("等待二级表生成完成");
                    latch.await();
                    System.out.println(Thread.currentThread().getName() + ":汇总统计");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
        }.start();
    }
}

输出:"Thread-0:二级表生成、Thread-1:二级表生成、等待二级表生成完成",然后开始等待,直到Thread-0、Thread-1执行完成,然后"Thread-2:汇总统计"。

CountDownLatch是一个倒计时式的计数器,允许线程等待其他N个线程先执行完毕,再开始执行。

CountDownLatch基于AQS,是一个共享锁,await()使当前线程阻塞等待,countDown()计数器递减。

// CountDownLatch aqs源码:
‘’private static final class Sync 
                extends AbstractQueuedSynchronizer {
    Sync(int count) {
        setState(count);
    }
    int getCount() {
        return getState();
    }
    //加锁
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    //解锁
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

count是倒计时的初始数。

await()调用tryAcquireShared(1)方法获取锁,根据共享锁的实现返回值小于0时线程会被阻塞等待,也就是只有当state==0,才能成功获取锁。

countDown()调用tryReleaseShared(1)方法进行解锁,当state值为0时,共享锁才算完全释放,会唤醒队列里等待的线程。

CountDownLatch没有复位操作,当state的值为0时再调用await()就不会阻塞线程了,所以CountDownLatch只能使用一次。

CyclicBarrier

一个小栗子:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(3);
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ":计算完成");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ":入库");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();

        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ":计算完成");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ":入库");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();

        try {
            Thread.sleep(10000);// 等待
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() + ":计算完成");
                    barrier.await();
                    System.out.println(Thread.currentThread().getName() + ":入库");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }.start();
    }
}

输出结果:"Thread-0:计算完成、Thread-1:计算完成",等待,直到"Thread-2:计算完成",然后"Thread-0:入库、Thread-1:入库、Thread-2:入库"。

CyclicBarrier是可循环的同步屏障,将N个线程进行阻塞,直到阻塞线程的数量到达屏障点时屏障被打破,这N个线程才会继续执行。

CyclicBarrier使用一个重入锁实现,初始化时传入屏障点parties,即要阻塞的线程数量,还可以传入一个Runnable的实现barrierAction,它会在屏障打破时执行。在屏障未打破前调用await()方法的线程都会被阻塞。

public class CyclicBarrier {
    /** 屏障状态 */
    private static class Generation {
        boolean broken = false;
    }
    /** 重入锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** condition */
    private final Condition trip = lock.newCondition();
    /** 屏障点 */
    private final int parties;
    /** 到达屏障点执行的类 */
    private final Runnable barrierCommand;
    /** 当前状态 */
    private Generation generation = new Generation();
    /** 计数 */
    private int count;
    ... ...
}

阻塞放行流程:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();//加锁
    try {
        //当前状态
        final Generation g = generation;
        if (g.broken)//被打破 s1
            throw new BrokenBarrierException();
        //中断
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //计数
        int index = --count;
        //到达屏障点 s3
        if (index == 0) { 
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                //先执行barrierCommand
                if (command != null)
                    command.run();
                ranAction = true;
                //唤醒所以阻塞线程
                //重新实例化generation
                //复位操作
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }
        for (;;) {// s2
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && !g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation) {
                System.out.println("退出");
            return index;
            }
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

s1: g.broken==true说明屏障被打破,这时再调用await()会抛出异常。

如果线程中断则打破屏障并抛出异常,

计数器count递减并赋给index,index==0进入s2,否则进入s3

s2:index==0说明被拦截的线程数量已经达到屏障点,如果barrierAction不为空,则直接调用run方法,让其先执行。

nextGeneration()会唤醒所有在trip上等待的线程、重新赋值count为初始值parties,new 一个Generation赋给generation,这样一来CyclicBarrier就恢复如初了,可以被重新使用,返回0。

s3: count>0,说明还没有到达屏障点,进入for(;;)循环体,会让线程在条件队列trip上等待,直到屏障被打破。屏障打破时会重新赋值generation,被唤醒的线程会在(g != generation)这个点正常退出循环。

CyclicBarrier屏障正常打破后进行了复位操作,所以CyclicBarrier可以重复使用。

Semaphore

一个小栗子:

public class SemaphoreTest {
    private static final int tokenCount = 3;
    public static void main(String[] args) {
        final Semaphore tokens = new Semaphore(tokenCount); // 令牌发放者
        for(int i=0;i<10;i++)
            new Request(tokens).start();
    }

    static class Request extends Thread {
        private Semaphore tokens;

        public Request(Semaphore tokens) {
            this.tokens = tokens;
        }
        @Override
        public void run() {
            try {
                tokens.acquire();// 申请访问令牌
                System.out.println(Thread.currentThread().getName()+":访问资源...");
                Thread.sleep(3000);
                tokens.release();// 访问完毕归还令牌
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

输出: "Thread-0:访问资源、Thread-1:访问资源、Thread-2:访问资源",等待,然后"Thread-3:访问资源、Thread-5:访问资源、Thread-4:访问资源",不同的电脑顺序可能不一样,能发现都是3个组的访问。

Semaphore信号,可以限定访问受限资源的线程数量,用来协调访问资源的线程数量,使其处在一个恒定的值。
网络应用中为了保护服务器不被流量洪峰冲夸,会进行限流,限流会使用令牌桶算法,Semaphore就可以实现令牌桶:访问线程先拿到令牌才能访问,访问完后把令牌归还到桶中以便供其他线程使用,就保证了访问资源的线程数量和令牌数量一至。

Semaphore是一个共享锁,内部代码布局和ReentrantLock类似,支持公平性设置,如果设置为公平锁,能够使等待最久的线程先获取信号,默认为非公平性的。

public class Semaphore {
    /** 同步器实例 */
    private final Sync sync;
    /** 父类同步器*/
    abstract static class Sync extends AbstractQueuedSynchronizer {... ...}
    /** 非公平同步器*/
    static final class NonfairSync extends Sync {}
    /** 公平同步器*/
    static final class FairSync extends Sync {}
    /** 构造*/
    public MySemaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    /** 构造*/
    public MySemaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    ... ... 
}

加锁解锁流程和前面的写的共享锁一致。
不同的是信号获取的逻辑中没有对重入的处理,一个线程可以多次获取信号,每次获取都会让总量减1。
信号的释放时没有对信号的总量进行控制,比如初始的信号是5个,已经获取了5个,释放来了7个,这时可用的信号是7个,也就是说在释放的可以对信号数量进行扩容,如果在使用中需要保持信号数量恒定,一定要保证acquire和release成对出现。

小结

  1. CountDownLatch和CyclicBarrier都是以计数器的的形式来协调线程同步的,一个显著的区别是CyclicBarrier可重用,CountDownLatch是一次性的。
  2. CountDownLatch和CyclicBarrier还有一个语义层面上的区别是,Count DownLatch是线程等待另外N个线程执行完毕。CyclicBarrier是N个线程相互等待,直到都执行毕。
  3. CountDownLatch强调依赖,CyclicBarrier强调协作。典型的应用场景就是大任务拆解为小任务,然后合并计算结果,比如多线程下载大文件,多个下载线程将自己分配的文件段下载完毕后,合并线程才开始进行文件合并操作。
  4. Semaphore用来限定访问受限资源的线程数量,典型的应用场景是流量控制,比如并发操作数据库,数据库连接池只有10个,必须保证只能有10个线程去获取连接,否则会报错。
  5. Semaphore可以进行简单的服务端限流,比如一个RPC服务器只能支撑200QPS,就可以用Semaphore去限制请求RPC的线程数量。当然对于复杂的服务端限流还得使用更高效令牌桶(Token Bucket)或者漏桶(Leaky Bucket)算法。

码字不易,转载请保留原文连接https://www.jianshu.com/p/9e0ecc8b1358

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容