Java多线程9 Semaphore实现信号灯

Java多线程目录

前言

Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。
Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

1 Semaphore的主要方法

Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。

Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

void acquire():当前线程尝试去阻塞的获取1个许可证。
此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了1个可用的许可证,则会停止等待,继续执行。
  • 当前线程被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。
当前线程尝试去阻塞的获取多个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了n个可用的许可证,则会停止等待,继续执行。
  • 当前线程被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

void release():释放一个许可,将其返回给信号量。

void release(int n):释放n个许可。

int availablePermits():当前可用的许可数。
void acquierUninterruptibly():当前线程尝试去阻塞的获取1个许可证(不可中断的)。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了1个可用的许可证,则会停止等待,继续执行。

void acquireUninterruptibly(permits):当前线程尝试去阻塞的获取多个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

当前线程获取了n个可用的许可证,则会停止等待,继续执行。
boolean tryAcquire()当前线程尝试去获取1个许可证。

此过程是非阻塞的,它只是在方法调用时进行一次尝试。

如果当前线程获取了1个可用的许可证,则会停止等待,继续执行,并返回true。

如果当前线程没有获得这个许可证,也会停止等待,继续执行,并返回false。

boolean tryAcquire(permits):当前线程尝试去获取多个许可证。

此过程是非阻塞的,它只是在方法调用时进行一次尝试。

如果当前线程获取了permits个可用的许可证,则会停止等待,继续执行,并返回true。

如果当前线程没有获得permits个许可证,也会停止等待,继续执行,并返回false。

boolean tryAcquire(timeout,TimeUnit):当前线程在限定时间内,阻塞的尝试去获取1个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了可用的许可证,则会停止等待,继续执行,并返回true。
  • 当前线程等待时间timeout超时,则会停止等待,继续执行,并返回false。
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException一次,并停止等待,继续执行。

boolean tryAcquire(permits,timeout,TimeUnit):当前线程在限定时间内,阻塞的尝试去获取permits个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了可用的permits个许可证,则会停止等待,继续执行,并返回true。
  • 当前线程等待时间timeout超时,则会停止等待,继续执行,并返回false。
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException一次,并停止等待,继续执行。

2 实例讲解

public class SemaphoreTest {

    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();

        String[] name = {"Jack", "Pony", "Larry", "Martin", "James", "ZhangSan","Tree"};

        int[] age = {21,22,23,24,25,26,27};

        for(int i=0;i<7;i++)
        {
            Thread t1=new InformationThread(name[i],age[i]);
            executor.execute(t1);
        }
    }

    private static class InformationThread extends Thread {
        private final String name;
        private final int age;

        public InformationThread(String name, int age) {
            this.name = name;
            this.age = age;
        }


        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()
                        + ":大家好,我是" + name + "我今年" + age +
                        "当前时间段为:" + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(name + "要准备释放许可证了,当前时间为:" + System.currentTimeMillis());
                System.out.println("当前可使用的许可数为:" + semaphore.availablePermits());
                System.out.println("是否有正在等待许可证的线程:" + semaphore.hasQueuedThreads());
                System.out.println("正在等待许可证的队列长度(线程数量):" + semaphore.getQueueLength());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543498535306
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543498535306
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543498535306
Pony要准备释放许可证了,当前时间为:1543498536310
Jack要准备释放许可证了,当前时间为:1543498536310
当前可使用的许可数为:0
Larry要准备释放许可证了,当前时间为:1543498536310
是否有正在等待许可证的线程:true
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
正在等待许可证的队列长度(线程数量):4
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543498536311
是否有正在等待许可证的线程:true
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543498536311
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543498536312
James要准备释放许可证了,当前时间为:1543498537315
Martin要准备释放许可证了,当前时间为:1543498537315
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543498537316
正在等待许可证的队列长度(线程数量):0
ZhangSan要准备释放许可证了,当前时间为:1543498537317
当前可使用的许可数为:1
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
Tree要准备释放许可证了,当前时间为:1543498538319
当前可使用的许可数为:2
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0

以上是非公平信号量,将建立Semaphore对象的语句改为如下语句:

private static final Semaphore semaphore=new Semaphore(3,true);
pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543498810563
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543498810564
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543498810563
Jack要准备释放许可证了,当前时间为:1543498811564
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543498811564
Larry要准备释放许可证了,当前时间为:1543498811568
当前可使用的许可数为:0
Pony要准备释放许可证了,当前时间为:1543498811568
是否有正在等待许可证的线程:true
当前可使用的许可数为:0
正在等待许可证的队列长度(线程数量):3
是否有正在等待许可证的线程:true
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543498811568
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543498811568
Martin要准备释放许可证了,当前时间为:1543498812566
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543498812566
James要准备释放许可证了,当前时间为:1543498812572
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
ZhangSan要准备释放许可证了,当前时间为:1543498812572
当前可使用的许可数为:1
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
Tree要准备释放许可证了,当前时间为:1543498813568
当前可使用的许可数为:2
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
实现单例模式

将创建信号量对象语句修改如下:

private static final Semaphore semaphore=new Semaphore(1);

运行程序,结果如下:

pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543499053898
Jack要准备释放许可证了,当前时间为:1543499054903
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):6
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543499054904
Pony要准备释放许可证了,当前时间为:1543499055907
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):5
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543499055907
Larry要准备释放许可证了,当前时间为:1543499056909
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543499056909
Martin要准备释放许可证了,当前时间为:1543499057913
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):3
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543499057913
James要准备释放许可证了,当前时间为:1543499058914
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543499058915
ZhangSan要准备释放许可证了,当前时间为:1543499059919
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543499059919
Tree要准备释放许可证了,当前时间为:1543499060923
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
    如上可知,如果将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,然后这辆车出来后,下一辆车才能进。

3 源码解析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

构造方法

Semaphore有两个构造方法,如下:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
获取许可

先从获取一个许可看起,并且先看非公平模式下的实现。首先看acquire方法,acquire方法有几个重载,但主要是下面这个方法

  public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

从上面可以看到,调用了Sync的acquireSharedInterruptibly方法,该方法在父类AQS中,如下:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) //如果线程被中断了,抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //获取许可失败,将线程加入到等待队列中
            doAcquireSharedInterruptibly(arg);
    }

AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

该方法调用了父类中的nonfairTyAcquireShared方法,如下:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取剩余许可数量
                int available = getState();
                //计算给完这次许可数量后的个数
                int remaining = available - acquires;
                //如果许可不够或者可以将许可数量重置的话,返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

从上面可以看到,只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。看完了非公平的获取,再看下公平的获取,代码如下:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                //如果前面有线程再等待,直接返回-1
                if (hasQueuedPredecessors())
                    return -1;
                //后面与非公平一样
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

从上面可以看到,FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
看完了获取许可后,再看一下释放许可。

释放许可

释放许可也有几个重载方法,但都会调用下面这个带参数的方法,

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

releaseShared方法在AQS中,如下:

public final boolean releaseShared(int arg) {
        //如果改变许可数量成功
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

AQS子类实现共享模式的类需要实现tryReleaseShared类来判断是否释放成功,实现如下:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取当前许可数量
                int current = getState();
                //计算回收后的数量
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS改变许可数量成功,返回true
                if (compareAndSetState(current, next))
                    return true;
            }
        }

从上面可以看到,一旦CAS改变许可数量成功,那么就会调用doReleaseShared()方法释放阻塞的线程。

减小许可数量

Semaphore还有减小许可数量的方法,该方法可以用于用于当资源用完不能再用时,这时就可以减小许可证。代码如下:

protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

可以看到,委托给了Sync,Sync的reducePermits方法如下:

  final void reducePermits(int reductions) {
            for (;;) {
                //得到当前剩余许可数量
                int current = getState();
                //得到减完之后的许可数量
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //如果CAS改变成功
                if (compareAndSetState(current, next))
                    return;
            }
        }

从上面可以看到,就是CAS改变AQS中的state变量,因为该变量代表许可证的数量。

获取剩余许可数量

Semaphore还可以一次将剩余的许可数量全部取走,该方法是drain方法,如下:

public int drainPermits() {
        return sync.drainPermits();
    }

Sync的实现如下:

 final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }

可以看到,就是CAS将许可数量置为0。

总结

Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

特别感谢

Semaphore的工作原理及实例
深入理解Semaphore

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容