一、Semphore介绍
Semphore从英文单词上看,意思为信号量,在Java中该类的意思也如此,它的作用可以限制线程并发数量,如果不限制线程并发的数量,导致CPU频繁切换上下文,消耗大量资源,导致系统运行效率越来越低。引入Semphore有很大的必要性。
二、Semphore的使用
Semphore重要的方法法可以归结为以下四大类:
a、声明许可的数量,表示有多少个线程可以访问
public Semaphore(int permits);
//fair 参数代表线程获取信号量的公平性
public Semaphore(int permits, boolean fair);
b、获取信号量
//获取1个信号量,会阻塞当前线程
public void acquire();
//获取指定数量的信号量,会阻塞当前线程
public void acquire(int permits)
//尝试获取一个信号量,如果获取失败就立即返回,不会阻塞
public boolean tryAcquire();
//在规定时间尝试获取一个信号量,如果获取失败就立即返回
public boolean tryAcquire(long timeout, TimeUnit unit);
public boolean tryAcquire(int permits) ;
public booleantryAcquire(int permits, long timeout, TimeUnit unit);
//获取信号量,获取失败进入等待队列,不允许被中断
public void acquireUninterruptibly(int permits)
public void acquireUninterruptibly()
c、释放信号量
//释放1个信号量
public void release()
//释放指定信号量
public void release(int permits)
d、其他方法
//返回Semphore对象可用的信号量
public int availablePermits()
//返回Semphore对象可用的信号量,并将可用个数置为0
public int drainPermits()
//判断是否有线程在等待信号量
public final boolean hasQueuedThreads()
//得到等待任务队列的长度,即等待信号量线程的个数
public final int getQueueLength()
三、通过Semphore实现生产者消费者模式
生产者-消费者模式是并发,多线程中经典的设计模式,它分离生产者、消费者的工作加入了缓冲区,这样不经降低耦合性还支持并发。
生产者-消费者模式好处有以下三点:
1、生产者的工作就是在缓冲区中放数据,而消费者的工作就是从缓冲区中取数据,这样设计就大大降低了耦合性;
2、生产者/消费者可以用多线程实现,它们可以以并发的方式放/取数据;
3、如果消费者取数据的速度很慢,生产者可以一直生产数据,不用等待消费者取完数据后再生产。
接下来我实现生产者-消费者模式:
1、实现缓冲区,缓冲区中包含了生产数据的方法和取数据的方法,代码注释比较全
public class buffer{
//存放商品的地方 默认大小为10
volatile private static Object[] ary = new Object[10];
//生产者信号量 允许10个生产者
volatile private Semaphore proSemaphore = new Semaphore(10);
//消费者信号量 允许10个消费者者
volatile private Semaphore consumerSemaphore = new Semaphore(10);
//互斥锁
volatile private ReentrantLock lock = new ReentrantLock();
//生产者信号
volatile private Condition proCondition = lock.newCondition();
//消费者信号
volatile private Condition consumerCondition = lock.newCondition();
/**
* 判断消费者是否可以从buffer拿走商品
* @return
*/
public boolean judgeEmpty() {
boolean empty = true;
for (int i = 0; i < ary.length; i++) {
if(ary[i] != null){
empty =false;
break;
}
}
return empty;
}
/**
* 判断生产者可以往buffer放置商品
* @return
*/
public int judgeFull() {
int full = -1;
for (int i = 0; i < ary.length; i++) {
if(ary[i] == null){
full =i;
break;
}
}
return full;
}
public void production() throws InterruptedException {
//允许10个生产者线程生产
proSemaphore.acquire();
lock.lock();
while(judgeFull() == -1) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>buffer已经满了 无法生产 等待者为:"+Thread.currentThread().getName());
proCondition.await();
}
//开始生产
int index = judgeFull();
if(ary[index] == null) {
ary[index] = "数据"+index;
System.out.println("生产者:"+ Thread.currentThread().getName() +" 开始生产了 ,商品信息为:"+ary[index]);
}
consumerCondition.signalAll();
lock.unlock();
proSemaphore.release();
}
public void consumer() throws InterruptedException {
//允许10个消费者线程消费
consumerSemaphore.acquire();
lock.lock();
while(judgeEmpty()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>buffer已经空了 无法消费了 消费者为:"+Thread.currentThread().getName());
consumerCondition.await();
}
//开始消费
for (int i = 0; i < ary.length; i++) {
if(ary[i] != null) {
System.out.println("消费者:"+ Thread.currentThread().getName() +" 消费了 ,商品信息为:"+ary[i]);
ary[i] = null;
break;
}
}
proCondition.signalAll();
lock.unlock();
consumerSemaphore.release();
}
2、实现消费者线程、生产者线程
/**
* 消费者线程
* @author yuw
*
*/
class RunC implements Runnable{
private Buffer buffer;
public RunC(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
buffer.consumer();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 生产者线程
* @author yuw
*
*/
class RunPro implements Runnable{
private Buffer buffer;
public RunPro(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
buffer.production();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3、万事俱备,只差测试类
Buffer buffer = new Buffer();
//消费者线程 40个
Thread[] consumerThreads = new Thread[1];
for (int i = 0; i < consumerThreads.length; i++) {
consumerThreads[i] = new Thread(new RunC(buffer));
consumerThreads[i].setName("消费者"+ (i>9?i:0+i));
}
//生产者线程 20个
Thread[] proThreads = new Thread[1];
for (int i = 0; i < proThreads.length; i++) {
proThreads[i] = new Thread(new RunPro(buffer));
proThreads[i].setName("生产者"+ (i>9?i:0+i));
}
for (int i = 0; i < proThreads.length; i++) {
proThreads[i].start();
consumerThreads[i].start();
}