java concurrent 之 semaphore
java.util.concurrent.Semaphore类是一个计数信号量。 这意味着它有两个主要的方法:
- acquire()
- release()
计数信号量用给定数量的“许可证”初始化。 对于每个调用acquire()一个许可证是由调用线程执行的。 对于每个调用release(),许可证返回到信号量。 因此,最多N个线程可以通过没有任何release()调用的acquire()方法,其中N是信号量初始化的许可数。 许可证只是一个简单的柜台。 这里没什么好想的
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
使用场景 Semaphore Usage
由于信号量通常有两种用途:
- 保护一个关键部分不要一次输入超过N个线程。
- 在两个线程之间发送信号
保护关键部分
如果您使用信号量来保护关键部分,尝试进入关键部分的线程通常会首先尝试获取许可证,进入关键部分,然后再次释放许可证。 喜欢这个:
Semaphore semaphore = new Semaphore(1);
//critical section
semaphore.acquire();
...
semaphore.release();
线程之间发送信号
如果您使用信号量在线程之间发送信号,则通常会有一个线程调用acquire()方法,另一个线程调用release()方法。
如果没有可用的许可证,则获取()调用将阻塞,直到另一个线程释放许可证。 类似地,如果没有更多的许可证可以释放到这个信号量中,release()调用将被阻止。
因此可以协调线程。 例如,如果在Thread 1已经将一个对象插入共享列表中被调用,并且Thread2在从该列表中获取对象之前调用了release(),则基本上创建了一个阻塞队列。 信号量中可用的许可证数量将对应于阻塞队列可以容纳的最大元素数量。
没有保证线程从信号量获取许可的公平性。 也就是说,不能保证第一个调用acquire()的线程也是获取许可证的第一个线程。 如果第一个线程被阻塞等待许可证,则释放许可证的第二个线程检查许可证,实际上可以在线程1之前获得许可证。
如果要执行公平性,则Semaphore类有一个构造函数,它将布告器告知信号量是否应该执行公平性。 执行公平性会导致性能/并发惩罚,所以不要启用它,除非你需要它。
以下是如何在公平模式下创建信号量:
Semaphore semaphore = new Semaphore(1, true);
demo
package com.viashare.semaphore;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
/**
* Created by Jeffy on 16/01/11.
*/
public class SemaphoreMain {
//尝试改变我试试
private static final int count = 100;
public static void main(String[] args) {
//模拟最多10个线程能够访问
Semaphore semaphore = new Semaphore(10);
VisitedObj visitedObj = new VisitedObj(semaphore);
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
for (int i = 0; i < count; i++) {
new Thread(new ThreadTask(visitedObj, cyclicBarrier)).start();
}
}
static class VisitedObj {
private Semaphore semaphore;
public VisitedObj(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void getProtectedIterm() throws InterruptedException {
//get acquire
semaphore.acquire();
System.err.println("i have executed……");
// semaphore.release();
}
}
static class ThreadTask implements Runnable {
private VisitedObj visitedObj;
private CyclicBarrier cyclicBarrier;
public ThreadTask(VisitedObj visitedObj, CyclicBarrier cyclicBarrier) {
this.visitedObj = visitedObj;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
try {
visitedObj.getProtectedIterm();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}