多线程(四)协作篇之其他api-CountDownLatch、CyclicBarrier、Semaphore

多线程协作除了上一篇中讲到的简单的生产者消费者模型的几种实现,jdk还提供了一些其他api,实现线程间协作的模型:CountDownLatch用于倒计数栅栏模型,一个线程等待其他多个线程就绪后再继续执行;CyclicBarrier用于循环路障模型,可重置循环使用的倒计数栅栏;Semaphore用于信号量模型,释放一定总量的信号量,每个线程需要获取一个信号量才能执行,从而进行限流


CountDownLatch:

倒计数栅栏模型的使用场景

  • 场景一:计数设为1,启动多个线程await,等待主线程发送countdown信号后同步开始执行任务,场景类似跑步比赛,运动员都就绪等待发令枪响。可以用于简单的并发测试
  • 场景二:计数设为n,将一个任务划分为n个线程进行同时处理,统计线程await等待所有n个线程执行完毕countdown,计数归零后完成统计工作。
  • 场景三:结合前两者,设置两个CountDownLatch,一个计数为1的启动信号的latch,一个计数为n的完成信号的latch,一个大任务,用n个线程执行,每个线程都等待统筹线程将启动信号计数为1的latch计数countdown归零后执行任务。执行完成后每个线程countdown,将完成信号的n个计数的lantch点计数归零后统筹线程进行统计。
  • 下面的代码示例是演示场景二中CountDownLatch的使用
@Slf4j
public class CountDownLatchDemo {
    public static void main(String[] args) throws Exception{
        // 定义需要等待的倒计时个数
        CountDownLatch latch=new CountDownLatch(10);
        Random random=new Random();
        // 推荐的线程池的定义方法
        ExecutorService executor=new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS,
                // 定义你的等待队列大小
                new LinkedBlockingDeque<>(50),
                // 定义你的线程生成方法
                new ThreadFactory() {
                    private AtomicInteger threadNum=new AtomicInteger(0);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r,"custom work thread "+threadNum.addAndGet(1));
                    }
                },
                (r,e)->{
                    // 定义你的拒绝策略
                    String message="Task " + r.toString() + " rejected from " + e.toString();
                    log.error(message);
                    throw new RejectedExecutionException(message);
                });
        for(int i=0;i<10;i++){
            int j=i;
            executor.execute(()->{
                try {
                    // 这里用于自己的业务实现,如果需要使用共享变量,注意使用线程安全的api或者同步锁
                    Thread.sleep(random.nextInt(2000));
                    log.debug("the {} is ready,i is {}",Thread.currentThread().getName(),j);
                    // 执行完即完成一个倒数计时
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.execute(()->{
            try {
                long t1=System.currentTimeMillis();
                log.debug("the thread {} is awaiting !",Thread.currentThread().getName());
                latch.await();
                log.debug("the thread {} is running ,waiting time is {} ms !",Thread.currentThread().getName(),System.currentTimeMillis()-t1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long t1=System.currentTimeMillis();
        log.debug("main thread is waiting !");
        // 等齐了或者倒计时到后继续
        latch.await(2000L,TimeUnit.SECONDS);
        log.debug("main thread is end ,waiting time is {} ms !",System.currentTimeMillis()-t1);
        executor.shutdown();
    }
}

debug信息如下:
10:59:17.952 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is waiting !
10:59:18.091 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 3 is ready,i is 2
10:59:18.094 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is awaiting !
10:59:18.179 [custom work thread 8] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 8 is ready,i is 7
10:59:18.282 [custom work thread 1] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 1 is ready,i is 0
10:59:18.599 [custom work thread 5] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 5 is ready,i is 4
10:59:19.096 [custom work thread 9] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 9 is ready,i is 8
10:59:19.246 [custom work thread 6] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 6 is ready,i is 5
10:59:19.540 [custom work thread 10] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 10 is ready,i is 9
10:59:19.574 [custom work thread 2] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 2 is ready,i is 1
10:59:19.670 [custom work thread 4] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 4 is ready,i is 3
10:59:19.941 [custom work thread 7] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 7 is ready,i is 6
10:59:19.941 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is end ,waiting time is 1990 ms !
10:59:19.941 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is running ,waiting time is 1847 ms !

CyclicBarrier:

循环路障模型的使用场景

  • 计数设为n的可循环使用的路障,用于多个线程调用await,n个线程都达到await之后路障计数归零,线程一起同步继续,同时该路障被重置为n,可以继续循环使用。路障还可以设置一个执行器,在路障计数归零时被触发。用于比如,将一个大任务化为m个子任务,用n个线程执行,当每执行完n个子任务后,触发一次统计任务,同时开启下一批n个子任务的执行;
  • 下面的代码示例即同时执行两个任务,每次两个都执行完再启动下一轮两个任务。
@Slf4j
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        AtomicInteger num=new AtomicInteger(0);
        CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
            num.addAndGet(1);
            log.debug("cyclicBarrier arrived,both ready to run,time is {}",num.get());
        });
        // 演示demo可以用简单的新线程池
        ExecutorService executorService= Executors.newFixedThreadPool(2);
        executorService.execute(()->{
            int i=0;
            while (i<4) {
                try {
                    log.debug("job 1 begin to start!");
                    cyclicBarrier.await();
                    log.debug("cyclicBarrier arrived! job 1 is running to start!");
                    i++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });
        executorService.execute(()->{
            int i=0;
            while (i<4) {
                try {
                    log.debug("job 2 begin to start!");
                    cyclicBarrier.await();
                    log.debug("cyclicBarrier arrived! job 2 is running to start!");
                    i++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

debug信息如下:
11:56:29.438 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.438 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.440 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 1
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 2
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 3
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 4
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!

Semaphore

信号量模型的使用场景

  • 计数为n的Semaphore,即持有n个许可,线程可应通过该信号量的acquire或者tryAcquire来获取许可,获取许可后才能使用或者获取资源,使用完后通过release释放许可。该模型的设计即是为了实现对物理或者逻辑资源的获取进行限流。同时最多n个线程持有该资源的许可。当计数设置为1,即有互斥锁的效果,相比其他锁,优势是可以在其他线程进行release,从而处理死锁。
  • 下面的代码示例对于一个限定的资源,使用Semaphore颁发许可,进行限流
@Slf4j
public class SemaphoreDemo {
    public static void main(String[] args) {
        ResourcePool resourcePool=new ResourcePool(5);
        ExecutorService executorService= Executors.newFixedThreadPool(20);
        for(int i=0;i<15;i++){
            executorService.execute(()->{
                String s=null;
                try {
                    s=resourcePool.getResource();
                    log.debug("{} 线程 获取资源 {}",Thread.currentThread().getName(),s);
                    Thread.sleep(1000l);
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    // 释放资源确保在finnaly中
                    if(StringUtils.hasText(s)){
                        resourcePool.releaseResource(s);
                    }
                }
            });
        }
        executorService.shutdown();
    }
}

@Slf4j
@Data
class ResourcePool{
    private Semaphore semaphore;
    private Queue<String> sourceQueue;
    public ResourcePool(Integer n){
        semaphore=new Semaphore(n);
        sourceQueue=new LinkedList<>();
        for(int i=0;i<n;i++){
            sourceQueue.add(String.valueOf(i));
        }
    }
    String getResource() throws Exception{
        if(semaphore.tryAcquire(10L, TimeUnit.SECONDS)){
            log.debug("成功获取信号量");
            return sourceQueue.poll();
        }else {
            return null;
        }
    }
    void releaseResource(String s){
        log.debug("释放资源 {}",s);
        semaphore.release();
        sourceQueue.add(s);
    }
}

打印如下:

13:20:20.970 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.970 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:20.972 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-1 线程 获取资源 0
13:20:20.972 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-4 线程 获取资源 3
13:20:20.972 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-3 线程 获取资源 4
13:20:20.972 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-5 线程 获取资源 2
13:20:20.972 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-2 线程 获取资源 1
13:20:21.976 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 3
13:20:21.976 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:21.976 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:21.976 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:21.976 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:21.976 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.976 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-6 线程 获取资源 4
13:20:21.976 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-7 线程 获取资源 1
13:20:21.977 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-8 线程 获取资源 0
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-9 线程 获取资源 2
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-10 线程 获取资源 null
13:20:22.979 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:22.979 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:22.979 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:22.979 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-11 线程 获取资源 4
13:20:22.979 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.979 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:22.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-13 线程 获取资源 2
13:20:22.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-12 线程 获取资源 0
13:20:22.980 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-14 线程 获取资源 1
13:20:23.980 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4
13:20:23.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 2
13:20:23.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 0
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 成功获取信号量
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-15 线程 获取资源 4
13:20:23.985 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 1
13:20:24.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 释放资源 4

本篇讲到了多线程协作的CountDownLatch、CyclicBarrier、Semaphore三个api的使用场景和使用用例,其他工具比如信号量的api在guava的RateLimiter也可以实现限流,使用guava的令牌桶思想,也可以用redis实现在分布式环境下的限流。本轮多线程篇将暂时写到这,将来有人咨询其他问题也可以继续更新多线程篇。下一篇将是关于redis的搭建、使用、集群和在分布式情况下的应用场景。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355

推荐阅读更多精彩内容