多线程之Semaphore (限流Java 版)

概念

计数信号量。从概念上讲,信号量维护一组许可证。

举一个例子

某银行分店只有三个窗口,所以同一时间最多只有三个人办理业务,其它人只能等待。可以把办理业务的人比作成线程,三个窗口就相当于三个许可证。此时来了4个人,先到的三个领到人许可证然后办理业务,第四个人呢只有等待,等待其中一个先办好业务释放许可证之后,然后再办理业务。

简单的用法

调用aquire方法是阻塞的直到有一个许可可用然后返回。每次调用release方法就会增加一个许可,隐式地释放一个阻塞获取者(调用aquire方法阻塞的线程)。当然没有所谓实际的许可对象Semaphore仅仅是维护了一个数字而已,然后执行相应的减加操作而已。

一个demo?

public class SemaphoreDemo {

    public static final int THREAD_SIZE = 10;

    public static void runSomething() throws InterruptedException {
        //模拟处理什么事
        Thread.sleep(1000);
        System.out.println(String.format("current threadId is %d,current time is %d",
                Thread.currentThread().getId(), System.currentTimeMillis() / 1000));
    }

    public static void main(String[] args) throws InterruptedException {
        //创建一个包含4个许可证的信号量实例
        Semaphore semaphoreDemo = new Semaphore(4);
        for (int i = 0; i < THREAD_SIZE; i++) {
            //获取许可
            Thread demoThread = new Thread(() -> {
                try {
                    //获取许可
                    semaphoreDemo.acquire();
                    //操作资源
                    runSomething();
                } catch (InterruptedException e) {
                    //抛出InterruptedException 会将该线程的中断标志设置为false
                    Thread.currentThread().interrupt();
                } finally {
                    semaphoreDemo.release();
                }
            });
            //开启demo线程
            demoThread.start();

        }

    }
}

👆程序大家有认真看嘛,给大家一分钟的时间看下。。。。。。大家发现有没有啥问题?考虑一下?
👆程序在调用aquire方法时会阻塞住,如果此时该线程被中断了finally 还执行release 方法.So,we can optimize it !
Semaphore 类提供了一个好用的方法
tryAcquire 方法,调用该方法在调用的时间如果有许可的话则会获取到许可并返回true,但是如果当前没有许可的话则会立即返回false
整改之后的代码如下:

/**
 * @author 梁自强
 * @date 2019.09.20
 */
public class SemaphoreAdvancedDemo {

    public static void main(String[] args) {
        //新建一个拥有4个许可的信号量
        Semaphore semaphoreAdvance = new Semaphore(4);
        for (int i = 0; i < THREAD_SIZE; i++) {
            Thread demoThread = new Thread(() -> {
                boolean isAcquire = false;
                try {
                    //tryAcquire 如果 没有许可会立即返回false,否则会通过CAS 去修改被volatile修饰的许可总数即state
                    while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
                        Thread.sleep(100);
                    }
                    runSomething();
                } catch (InterruptedException e) {
                    System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
                    Thread.currentThread().interrupt();
                } finally {
                    if (isAcquire) {
                        semaphoreAdvance.release();
                        System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
                    }
                }
            });
            demoThread.start();
            //随机调用interrupt 方法模仿实际被中断
            if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
                demoThread.interrupt();
            }
        }
    }
}

上面有个while 循环感觉很不爽有木有?我们再来优化一下?
下面在看一个神奇的方法,再也不害怕中断了
acquireUninterruptibly() :从信号量实例获取许可,直到有一个许可可用否则一直阻塞。若阻塞中的线程被调用了interrupt方法,该线程会一直等待,当获取到许可返回时中断状态会被设置为true
测试方法:

 public static void main(String[] ar) throws InterruptedException {
        
        /**
         *
         * {@link Semaphore#acquireUninterruptibly} 方法 获取许可,如有许可则返回,
         * 若么有阻塞,在阻塞的过程中线程调用中断方法也不会影响线程的等待获取许可,但是在返回时该线程的中断状态会被设置为true
         * 测试步骤:
         * 1.创建一个拥有一个许可的信号量实例
         * 2.在主线程中acquire一个许可
         * 3.创建一个线程a去获取许可
         * 4.调用a.interrupt方法
         * 5.主线程释放许可
         */
        Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        Thread testThread = new Thread(()->{
            semaphore.acquireUninterruptibly();
            //测试线程是否是中断状态
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("pass");
            }else {
                System.out.println("get error");
            }
        });
        //启动测试线程
        testThread.start();
        //中断测试线程
        testThread.interrupt();
        //释放许可
        semaphore.release();
    }
结果:
pass

下面来使用下这个 acquireUninterruptibly 方法来改造一下我们的增加版demo,下面我们来看下,做出一些调整使用了策略模式修改了我们上面的类,使其易于扩展,因为需要传到Thread里执行,这里我就偷个懒,直接使用java 的Runnable 为策略基类,实现了两个子类实现run 接口,类图关系如下:

Semaphore 使用策略图

并且在方法调用上还特别设计了一下,在调用的时候 传的是Supplier 实现类对象,相当于函数式调用大家不仅学习了多线程的知识,还学习如何使用java 8 的新姿势,此时不点个赞,有点对不住我这个博主φ(>ω<*) 。
代码如下:

public class SemaphoreAdvancedDemo {

    public static void main(String[] args)  {
        //新建一个拥有4个许可的信号量
        Semaphore semaphoreAdvance = new Semaphore(4);
//        testSemaphore(() -> new Advance(semaphoreAdvance));

        testSemaphore(()->new Final(semaphoreAdvance));
    }

    private static void testSemaphore(Supplier<Runnable> supplier) {

        for (int i = 0; i < THREAD_SIZE; i++) {
            Thread demoThread = new Thread(supplier.get());

            demoThread.start();
            //随机调用interrupt 方法模仿实际被中断
            if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
                demoThread.interrupt();
            }
        }
    }
    static class Advance implements Runnable {

         Advance(Semaphore semaphoreAdvance) {
            this.semaphoreAdvance = semaphoreAdvance;
        }

        private Semaphore semaphoreAdvance;

        @Override
        public void run() {
            boolean isAcquire = false;
            try {
                //tryAcquire 如果 没有许可会立即返回false,否则会通过CAS 去修改被volatile修饰的许可总数即state
                while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
                    Thread.sleep(100);
                }
                runSomething();
            } catch (InterruptedException e) {
                System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
                Thread.currentThread().interrupt();
            } finally {
                if (isAcquire) {
                    semaphoreAdvance.release();
                    System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
                }
            }
        }
    }

    static class Final implements Runnable {

         Final(Semaphore semaphoreFinal) {
            this.semaphoreFinal = semaphoreFinal;
        }
        private Semaphore semaphoreFinal;

        @Override
        public void run() {
            try {
                semaphoreFinal.acquireUninterruptibly();
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println(String.format("[final] %s have interrupt", Thread.currentThread().getName()));
                    throw new InterruptedException();
                }
                runSomething();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                semaphoreFinal.release();
                System.out.println(String.format("%s released a permit", Thread.currentThread().getName()));
            }
        }
    }
}

使用场景

通例:某类资源同时限定n 个线程访问

实际使用场景

  • 信号量可以用来限制一次数据库连接的数量
  • 可以实现java 版的限流工具
  • 信号量持有一个许可证的时侯可以当做同步资源来使用,不过使用需要小心因为信号量即使一个线程没有获取许可证,也可以释放许可证,这就是和排它锁的区别,但是如果你使用得当,它还可以解决线上死锁的问题(大家可以思考下怎么设计在评论区讨论)
    最后实现一个信号量版的令牌桶算法
    令牌桶示意图

    上源码
/**
 * 限流:令牌桶算法
 */
public class RateLimiterOnSemaphore {
    private static final int DEFAULT_REQUEST_PER_SECOND = 200;
    private static final int SECOND_MILLIONS = 1000;
    private ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
    private final Semaphore tokenContainer;

    private final int requestPerSecond;

    public RateLimiterOnSemaphore() {
        this(DEFAULT_REQUEST_PER_SECOND);
    }

    public RateLimiterOnSemaphore(int requestPerSecond) {
        this.requestPerSecond = requestPerSecond;
        tokenContainer = new Semaphore(requestPerSecond);
        //定时任务往container 匀速的存放token
        //计算 定时任务执行的间隔时间
        long period = SECOND_MILLIONS / requestPerSecond;

        schedule.scheduleAtFixedRate(this::putToken, 0, period, TimeUnit.MILLISECONDS);
    }

    public static RateLimiterOnSemaphore create(int tokensPerSecond) {
        return new RateLimiterOnSemaphore(tokensPerSecond);
    }

    /**
     * 获取token
     */
    public void acquire() {
        tokenContainer.acquireUninterruptibly();
    }

    /**
     * 尝试获取token
     *
     * @return true 如果获取成功 否则返回false
     */
    public boolean tryAcquire() {
        return tokenContainer.tryAcquire();
    }

    /**
     * 往容器里存放token
     */
    private void putToken() {
        //判断是否达到每秒上限
        if (tokenContainer.availablePermits() < requestPerSecond) {
            tokenContainer.release();
        }
    }
}

小结
这篇主要讲信号量的用法,下一篇讲jdk 是如何实现的。请大家点赞关注下吧,动动你的小拇指是对我最大的帮助。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352