阻塞队列:
阻塞队列底层是由ReentrantLock和condition实现的
1.JUC提供的阻塞队列
2.阻塞队列的操作方法
3.使用案例
1.JUC提供的阻塞队列
在 Java8 中提供了 7 个阻塞队列
ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行
排序
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数Comparator 来对元素进行排序。
DelayQueue 优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列
2.阻塞队列的操作方法
在阻塞队列中,提供了四种处理方式
- 插入操作
add(e) :添加元素到队列中,如果队列满了,继续插入
元素会报错,IllegalStateException。
offer(e) : 添加元素到队列,同时会返回元素是否插入
成功的状态,如果成功则返回 true
put(e) :当阻塞队列满了以后,生产者继续通过 put
添加元素,队列会一直阻塞生产者线程,知道队列可用
offer(e,time,unit) :当阻塞队列满了以后继续添加元素,
生产者线程会被阻塞指定时间,如果超时,则线程直接
退出 - 移除操作
remove():如果元素移除成功,则返回 true,如果移除失败(队列为空),则会抛出异常。
poll(): 当队列中存在元素,则从队列中取出一个元素,
如果队列为空,则直接返回 null
take():基于阻塞的方式获取队列中的元素,如果队列为
空,则 take 方法会一直阻塞,直到队列中有新的数据可
以消费
poll(time,unit):带超时机制的获取数据,如果队列为空,
则会等待指定的时间再去获取元素返回
3.使用案例
注册成功后增加积分
假如我们模拟一个场景,就是用户注册的时候,在注册成
功以后发放积分。这个场景在一般来说,我们会这么去实
现
image.png
但是实际上,我们需要考虑两个问题
- 性能,在注册这个环节里面,假如添加用户需要花费 1 秒
钟,增加积分需要花费 1 秒钟,那么整个注册结果的返
回就可能需要大于 2 秒,虽然影响不是很大,但是在量
比较大的时候,我们也需要做一些优化 - 耦合,添加用户和增加积分,可以认为是两个领域,也
就是说,增加积分并不是注册必须要具备的功能,但是
一旦增加积分这个逻辑出现异常,就会导致注册失败。
这种耦合在程序设计的时候是一定要规避的
因此我们可以通过异步的方式来实现
改造之前的代码逻辑
public class UserService {
public boolean register(){
User user=new User();
user.setName("Mic");
addUser(user);
sendPoints(user);
return true;
}
public static void main(String[] args) {
new UserService().register();
}
private void addUser(User user){
System.out.println("添加用户:"+user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sendPoints(User user){
System.out.println(" 发 送 积 分 给 指 定 用
户:"+user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
改造之后的逻辑
public class UserService {
private final ExecutorService single =
Executors.newSingleThreadExecutor();
private volatile boolean isRunning = true;
ArrayBlockingQueue arrayBlockingQueue=new
ArrayBlockingQueue(10);
{
init();
}
public void init(){
single.execute(()->{
while(isRunning){
try {
User
user=(User)arrayBlockingQueue.take();//阻塞的方
式获取队列中的数据
sendPoints(user);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
});
}
public boolean register(){
User user=new User();
user.setName("Mic");
addUser(user);
arrayBlockingQueue.add(user);//添加到异步
队列
return true;
}
public static void main(String[] args) {
new UserService().register();
}
private void addUser(User user){
System.out.println("添加用户:"+user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sendPoints(User user){
System.out.println(" 发 送 积 分 给 指 定 用
户:"+user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优化以后,整个流程就变成了这样
image.png
例2:
下面是使用阻塞队列实现的生产者-消费者模式:
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while(true){
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
while(true){
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}