- 简单的Semaphore实现
public class Semaphore {
private boolean signal = false;
public synchronized void take() {
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(!this.signal) wait();
this.signal = false;
}
}
- 使用Semaphore来发出信号
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();
public class SendingThread {
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
//do something, then signal
this.semaphore.take();
}
}
}
public class RecevingThread {
Semaphore semaphore = null;
public ReceivingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
this.semaphore.release();
//receive signal, then do something...
}
}
}
- 可计数的Semaphore
public class CountingSemaphore {
private int signals = 0;
public synchronized void take() {
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
}
}
- 有上限的Semaphore
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
- 把Semaphore当锁来使用
BoundedSemaphore semaphore = new BoundedSemaphore(1);
...
semaphore.take();
try{
//critical section
} finally {
semaphore.release();
}
例子:
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service= Executors.newCachedThreadPool();
final Semaphore sp=new Semaphore(3);
for (int i=0;i<10;i++){
Runnable runnable=new Runnable() {
@Override
public void run() {
try {
sp.acquire();
}catch (Exception e){
e.printStackTrace();
}
System.out.println("线程"+Thread.currentThread().getName()+"进入,当前已有"+(3-sp.availablePermits()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println("线程"+Thread.currentThread().getName()+"离开,当前已有"+(3-sp.availablePermits()));
sp.release();
System.out.println("线程"+Thread.currentThread().getName()+"离开,当前已有"+(3-sp.availablePermits()));
}
};
service.execute(runnable);
}
}
}
输出结果:
线程pool-1-thread-1进入,当前已有1
线程pool-1-thread-2进入,当前已有2
线程pool-1-thread-3进入,当前已有3
线程pool-1-thread-2离开,当前已有1
线程pool-1-thread-6进入,当前已有3
线程pool-1-thread-3离开,当前已有2
线程pool-1-thread-1离开,当前已有3
线程pool-1-thread-5进入,当前已有3
线程pool-1-thread-4进入,当前已有2