Java多线程-并发工具类Semaphore详解

简介
Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.
无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.
Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。
信号量上定义两种操作:

acquire(获取):当一个线程调用acquire操作时,它要么成功获取到信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时,Semaphore内部会维护一个等待队列用于存储这些被暂停的线程.
release(释放)实际上会将信号量的值+1,然后唤醒相应Sepmaphore实例的等待队列中的一个任意等待线程.

应用场景
信号量主要用于两个目的:

用于多个共享资源的互斥使用.
用于并发线程数的控制.

例子
以下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位.
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);

    for (int i = 0; i < 5; i++) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//申请资源
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
                    System.out.println(Thread.currentThread().getName()+"归还车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    //释放资源
                    semaphore.release();
                }

            }
        },"线程"+i).start();
    }
}

复制代码注意事项

Semaphore.acquire()和Semaphore.release()总是配对使用的,这点需要由应用代码自身保证.
Semaphore.release()调用应该放在finally块中,已避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还.
如果Semaphore构造器中的参数permits值设置为1,所创建的Semaphore相当于一个互斥锁.与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁.因为一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release().
默认情况下,Semaphore采用的是非公平性调度策略.

原理
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
复制代码Semaphore内部使用Sync类,Sync又是继承AbstractQueuedSynchronizer,所以Sync底层还是使用AQS实现的.Sync有两个实现类NonfairSync和FairSync,用来指定获取信号量时是否采用公平策略.
初始化方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
setState(permits);
}
复制代码如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。
参数permits被传递给AQS的state值,用来表示当前持有的信号量个数.
void acquire()方法
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等0,则当前线程会被放入AQS的阻塞队列。当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。
//Semaphore方法
public void acquire() throws InterruptedException {
//传递参数为1,说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}

//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略
if (tryAcquireShared(arg) < 0)
//如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}
复制代码由如上代码可知,acquire()在内部调用了Sync的acquireSharedlnterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法 tryAcquireShared是由Sync的子类实现的,所以这里分别从两 方面来讨论。
先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取当前信号量值
int available = getState();
//计算当前剩余值
int remaining = available - acquires;
//如果当前剩余值小于0或则CAS设置成功则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。
考虑下面场景,如果线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列 。过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量,但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源 。 如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程 A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。
下面看公平性的FairSync类是如何保证公平性的。
protected int tryAcquireShared(int acquires) {
for (;;) {
//查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码可见公平性还是靠hasQueuedPredecessors这个函数来保证的。所以Semaphore的公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。
void acquire(int permits)方法
该方法与acquire()方法不同,后者只需要获取一个信号量值, 而前者则获取permits个。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
复制代码void acquireUninterruptibly()方法
该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt() 方法设置了当前线程的中断标志,此时当前线程并不会抛出IllegalArgumentException异常而返回。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
复制代码void release()方法
该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞 队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活, 激活的线程会尝试获取刚增加的信号量.
public void release() {
//(1)arg=1
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
//(2)尝试释放资源
if (tryReleaseShared(arg)) {
//(3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前信号量值
int current = getState();
//将当前信号量值增加releases,这里为增加1
int next = current + releases;
//移除处理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}
复制代码由代码release()->sync.releaseShared(1),可知,release方法每次只会对信号量值增加1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操作.tryReleaseShared方法增加信号量值成功后会执行代码(3),即调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。
void release(int permits)方法
该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加 permits,而后者每次增加l。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
复制代码另外可以看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会被阻塞。

链接:https://juejin.im/post/5e204a2a518825367841d40d

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353