一、队列的概念
定义 | 特点 | |
---|---|---|
队列 | 一端删除(头)另一端添加(尾) | First In First Out |
双端队列 | 两端都可以删除、添加 | |
优先级队列 | 优先级高者先出队 | |
延时队列 | 根据延时时间确定优先级 | |
并发非阻塞队列 | 队列空或满时不阻塞 | |
并发阻塞队列 | 队列空时删除阻塞、队列满时添加阻塞 |
二、实现双端队列
定义接口
public interface Deque<E> {
/**
* 队首添加元素
* @param e
* @return
*/
boolean offerFirst(E e);
/**
* 队尾添加元素
* @param e
* @return
*/
boolean offerLast(E e);
/**
* 队首移除元素并返回
* @return
*/
E pollFirst();
/**
* 队尾移除元素并返回
* @return
*/
E pollLast();
/**
* 获取队首元素
* @return
*/
E peekFirst();
/**
* 获取队尾元素
* @return
*/
E peekLast();
/**
* 判断队列是否为空
* @return
*/
boolean isEmpty();
/**
* 判断队列是否已满
* @return
*/
boolean isFull();
}
2.1 基于数组实现:
package com.hcx.algorithm.queue;
import java.util.Iterator;
/**
* @Title: CircularArrayDeque.java
* @Package com.hcx.algorithm.queue
* @Description: 基于循环数组实现双端队列
* tail指向的位置不存储元素,每次都指向待存储的位置
* head操作:先往前走一个,然后设置元素
* tail操作:先设置值,然后往后走一个
* @Author: hongcaixia
* @Date: 2025/1/12 15:04
* @Version V1.0
*/
public class CircularArrayDeque<E> implements Deque<E>,Iterable<E> {
E[] array;
int head;
int tail;
public CircularArrayDeque(int capacity) {
array = (E[]) new Object[capacity + 1];
}
@Override
public boolean offerFirst(E e) {
if (isFull()) {
return false;
}
// head指针先往前走一个位置,然后设置值
head = head - 1;
if (head < 0) {
head = array.length - 1;
}
array[head] = e;
return true;
}
@Override
public boolean offerLast(E e) {
if (isFull()) {
return false;
}
array[tail] = e;
tail = tail + 1;
if (tail >= array.length) {
tail = 0;
}
return true;
}
@Override
public E pollFirst() {
if (isEmpty()) {
return null;
}
E e = array[head];
// help gc
array[head] = null;
head = head + 1;
if (head >= array.length) {
head = 0;
}
return e;
}
@Override
public E pollLast() {
if (isEmpty()) {
return null;
}
tail = tail - 1;
if (tail < 0) {
tail = array.length - 1;
}
E e = array[tail];
// help gc
array[tail] = null;
return e;
}
@Override
public E peekFirst() {
if (isEmpty()) {
return null;
}
return array[head];
}
@Override
public E peekLast() {
if (isEmpty()) {
return null;
}
tail = tail - 1;
if (tail < 0) {
tail = array.length - 1;
}
return array[tail];
}
@Override
public boolean isEmpty() {
return head == tail;
}
@Override
public boolean isFull() {
if (tail > head) {
return tail - head == array.length - 1;
} else if (tail < head) {
return head - tail == 1;
} else {
return false;
}
}
@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
int pointer = head;
@Override
public boolean hasNext() {
return pointer != tail;
}
@Override
public E next() {
E e = array[pointer];
pointer = pointer + 1;
if (pointer >= array.length) {
pointer = 0;
}
return e;
}
@Override
public void remove() {
}
};
}
}
2.2 基于链表实现
package com.hcx.algorithm.queue;
/**
* @Title: CircularLinkedListQueue.java
* @Package com.hcx.algorithm.queue
* @Description: Leetcode622.设计循环队列
* @Author: hongcaixia
* @Date: 2025/1/11 14:57
* @Version V1.0
*/
public class CircularLinkedListQueue {
private static class Node {
int value;
Node next;
Node(int value, Node next) {
this.value = value;
this.next = next;
}
}
private final Node head = new Node(-1, null);
private Node tail = head;
private int size = 0;
private int capacity = 0;
{
tail.next = head;
}
public CircularLinkedListQueue(int capacity) {
this.capacity = capacity;
}
public boolean enQueue(int value) {
if(isFull()) {
return false;
}
Node added = new Node(value, head);
tail.next = added;
tail = added;
size++;
return true;
}
public boolean deQueue() {
if(isEmpty()) {
return false;
}
Node first = head.next;
head.next = first.next;
if (first == tail) {
tail = head;
}
size--;
return true;
}
public int Front() {
if(isEmpty()) {
return -1;
}
return head.next.value;
}
public int Rear() {
if(isEmpty()) {
return -1;
}
return tail.value;
}
public boolean isEmpty() {
return head == tail;
}
public boolean isFull() {
return size == capacity;
}
}
三、实现阻塞队列
大部分场景要求分离生产者(向队列放入)和消费者(从队列拿出)两个角色、它们由不同的线程来承担,前面的实现没有考虑线程安全问题。
队列为空,前面的实现里会返回 null,如果就是要拿到一个元素只能不断循环尝试。
队列满,前面的实现里会返回 false,如果就是要放入一个元素只能不断循环尝试。
实现方案:
- 用锁保证线程安全
- 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转。
1.阻塞队列接口
package com.hcx.algorithm.queue;
/**
* @Title: BlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 阻塞队列接口
* @Author: hongcaixia
* @Date: 2025/1/13 15:16
* @Version V1.0
*/
public interface BlockingQueue<E> {
/**
* 阻塞入队元素
* 当队列满时,阻塞,直到队列不满时,把元素入队
*
* @param e
* @throws InterruptedException
*/
void offer(E e) throws InterruptedException;
/**
* 超时阻塞入队元素
* 当队列满时,阻塞,直到队列不满时,把元素入队,如果中间超时时间达到,则放弃入队,返回false
*
* @param e
* @param timeout
* @return
* @throws InterruptedException
*/
void offer(E e, long timeout) throws InterruptedException;
/**
* 元素出队
*
* @return
* @throws InterruptedException
*/
E poll() throws InterruptedException;
/**
* 判断队列是否为空
*
* @return
*/
boolean isEmpty();
/**
* 判断队列是否已满
*
* @return
*/
boolean isFull();
}
2.单锁实现
package com.hcx.algorithm.queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 单锁实现阻塞队列
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class SingleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 队列大小
int size = 0;
// 头指针
int head = 0;
// 尾指针
int tail = 0;
// 保证线程安全操作队列时需要加锁
ReentrantLock lock = new ReentrantLock();
// 当队列满时,线程先进入等待队列阻塞(此时会释放锁),等到有元素出队之后,再被其他线程唤醒,然后重新执行元素入队操作。
Condition tailWait = lock.newCondition();
// 当队列空时,线程需要进入等待队列(此时会释放锁),需要等到有元素入队之后,再被其他线程唤醒,然后执行元素出队操作。
Condition headWait = lock.newCondition();
public SingleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
// tailWait中的线程被唤醒之后,会与其他线程一起争抢锁,如果抢不到,那么就还需要再次进入等待队列,所以需要使用while循环判断,确保队列不是满的才往下执行
// 唤醒后应该重新检查条件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size++;
// 此时入队成功 唤醒在headWait中阻塞的线程(headWait中的线程是因为队列没有元素所以阻塞,此时队列中成功入队了一个元素,唤醒线程去取队列的元素)
headWait.signal();
} finally {
lock.unlock();
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly();
try {
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 唤醒后应该重新检查条件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待时间
time = tailWait.awaitNanos(time);
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size++;
// 此时入队成功 唤醒在headWait中阻塞的线程(headWait中的线程是因为队列没有元素所以阻塞,此时队列中成功入队了一个元素,唤醒线程去取队列的元素)
headWait.signal();
} finally {
lock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly();
try {
// 唤醒后应该重新检查条件
while (isEmpty()) {
headWait.await();
}
E e = array[head];
array[head] = null;
head++;
if (head == array.length) {
head = 0;
}
size--;
// 此时队列中已经有元素出队,那么队列中已经有空间可以再入队了,唤醒在入队时因为没空间阻塞的等待队列tailWait
tailWait.signal();
return e;
} finally {
lock.unlock();
}
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public boolean isFull() {
return size == array.length;
}
}
2.1 lock.lockInterruptibly();
加锁的过程中在阻塞时可以打断(如果是lock方法就不会被打断,会一直等下去),提前唤醒,不需要在阻塞下去了,但此时也不一定能获取到锁,除非持锁线程已经解锁了。
2.2 队列满时的操作
让offer线程进入阻塞状态。不继续执行添加操作。
使用lock的条件变量存储阻塞的offer线程。
tailWait.await();
将当前offer线程加入tailWait集合中,当前线程阻塞住。
await()方法会释放锁。
2.3 队列不满时的操作
唤醒阻塞的offer线程,让阻塞的线程继续往后执行,完成入队操作。
tailWait.signal();
调用signal()方法之前需要先获取锁。
2.4 处理虚假唤醒
// 唤醒后应该重新检查条件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待时间
time = tailWait.awaitNanos(time);
}
这里判断队列是否是满的要使用while,不能只使用if:
当多线程并发的情况下,队列只有一个空位时,唤醒了阻塞的线程,可能这个时候恰好又有一个线程也要操作入队,恰好比他这个刚被唤醒的线程抢先一步拿到了锁,发现队列没有满,就添加元素,队列又满了,执行完释放锁;此时对于刚被唤醒的线程来说,再拿到了锁,继续执行,那么就会覆盖掉队列的元素了,所以需要再判断一次队列是否已满。
这里注意被唤醒的阻塞线程,也是要重新抢锁的(await方法会释放锁的)没抢到,就还是被阻塞着。
2.5 处理带超时时间的虚假唤醒
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 唤醒后应该重新检查条件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待时间 返回的就是剩余等待时间。
time = tailWait.awaitNanos(time);
}
假设offer线程超时等待的时间为5s,时间过去了2s时,poll线程取走了一个元素,唤醒了在等待的offer线程;此时又来了一个offer线程,抢到了锁,又把队列放满了,那么被唤醒的线程此次为虚假唤醒,但是不能再继续等待5s了,已经等待了2s,只需要再等待3s;所以需要重新更新等待时间。
3.双锁实现
针对单锁的实现,出队和入队都使用了一把锁,实际出队和入队是不冲突的,所以为了提高性能,这里出队使用一把锁,入队使用另一把锁。针对共享变量size的操作,使用原子类来保证安全。
- 入队使用tailLock锁住队尾,阻塞时加入tailWait等待
- 出队使用headLock锁住队头,阻塞时加入headWait等待
入队offer逻辑
- 1.加tailLock锁
- 2.如果队列满,进入tailWait等待
- 3.队列不满,执行入队操作,同时唤醒在headWait等待的出队线程(因为有元素入队了,就代表可以有元素出队,之间因为没有元素而阻塞的等待出队的线程就可以唤醒来执行了)
出队poll逻辑
- 1.加headLock锁
- 2.如果队列空,进入headWait等待;
- 3.队列不空,执行出队操作,同时唤醒在tailWait等待的入队线程(因为有元素出队了,就代表有位置给元素入队,之间因为没有位置而阻塞的等待入队的线程就可以唤醒来执行了)
signal和await使用
- 必须配合锁来使用,调用前加锁,调用后解锁
- 必须配对使用:
- headWait调用await和signal方法,必须加的是headLock锁。(headWait是由headLock创建的)
- tailWait调用await和signal方法,必须加的是tailLock锁。(tailWait是由tailLock创建的)
基础版:
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 双锁实现阻塞队列
* 一把锁保护 tail,一把锁保护 head,提升性能
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 队列大小
AtomicInteger size = new AtomicInteger(0);
// 头指针
int head = 0;
// 尾指针
int tail = 0;
// 出队锁
ReentrantLock headLock = new ReentrantLock();
// 入队锁
ReentrantLock tailLock = new ReentrantLock();
// 当队列满时,线程先进入等待队列阻塞(此时会释放锁),等到有元素出队之后,再被其他线程唤醒,然后重新执行元素入队操作。
Condition tailWait = tailLock.newCondition();
// 当队列空时,线程需要进入等待队列(此时会释放锁),需要等到有元素入队之后,再被其他线程唤醒,然后执行元素出队操作。
Condition headWait = headLock.newCondition();
public DoubleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// tailWait中的线程被唤醒之后,会与其他线程一起争抢锁,如果抢不到,那么就还需要再次进入等待队列,所以需要使用while循环判断,确保队列不是满的才往下执行
// 唤醒后应该重新检查条件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size.getAndIncrement();
// 队列从空到非空,唤醒等待出队的poll线程
headLock.lock();
try {
headWait.signal();
}finally {
headLock.unlock();
}
} finally {
tailLock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWait.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
// 队列从满到不满,唤醒等待入队的offer线程
tailLock.lock();
try {
tailWait.signal();
} finally {
tailLock.unlock();
}
} finally {
headLock.unlock();
}
return e;
}
@Override
public boolean isEmpty() {
return size.get() == 0;
}
@Override
public boolean isFull() {
return size.get() == array.length;
}
}
上述代码,两把锁嵌套使用,非常容易出现死锁:
- poll线程获取了head锁,准备去获取tail锁
- offer线程获取了tail锁,准备去获取head锁
两个线程各自持有着一把锁,都准备去获取对方的锁,谁也不让谁,就出现了死锁。
改进:不嵌套获取,把唤醒线程的操作放在上一个锁解锁之后:
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// tailWait中的线程被唤醒之后,会与其他线程一起争抢锁,如果抢不到,那么就还需要再次进入等待队列,所以需要使用while循环判断,确保队列不是满的才往下执行
// 唤醒后应该重新检查条件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
size.getAndIncrement();
} finally {
tailLock.unlock();
}
// 队列从空到非空,唤醒等待出队的poll线程
headLock.lock();
try {
headWait.signal();
} finally {
headLock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWait.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
} finally {
headLock.unlock();
}
// 队列从满到不满,唤醒等待入队的offer线程
tailLock.lock();
try {
tailWait.signal();
} finally {
tailLock.unlock();
}
return e;
}
优化版:使用级联通知完成唤醒操作
因为在操作offer和poll时,不仅要拿到当前要操作的锁,还要拿对方的锁来唤醒对方。所以效率降低了。
针对offer线程来说,入队用的是tailLock锁,唤醒用的是headLock锁,要尽可能减少去获取headLock锁的次数。
场景:
刚开始队列为空:
来了3个poll线程操作,因为队列没有元素,所以都进入了headWait中等待。
来了3个offer线程,分别往队伍加入了3个元素,现在的代码,对于每一个offer线程,他都会去获取锁执行唤醒操作;为了减少offer线程取获取headLock锁,只让其中一个offer线程去获取锁执行唤醒操作,而对于headWait中剩下2个poll线程,则由已经被offer线程唤醒的那一个poll线程来唤醒。(condition里的结构是一个链表结构)
具体实现:
offer操作的headWait的唤醒逻辑:队列空时在headWait等待出队的tail线程
- 针对offer线程:只让刚入队第一个元素的offer线程(此时队列大小size为0)去执行唤醒操作,其他唤醒操作交给被唤醒的poll线程来实现级联唤醒。
- 针对poll线程:当headWait中的线程被唤醒之后,判断队列中是不是还有多的元素,如果有超过1个,那么就自己来唤醒在headWait中等待出队的线程。
poll操作的tailWait的唤醒逻辑:队列满时在tailWait中等待入队的offer线程
- 针对poll线程:取走了一个元素,那么队列就有空位了,可以唤醒在tailWait中等待的线程。不是所有的poll都执行,只在队列刚从满变成不满的时候(取走元素之前队列size和数组长度一样)由poll来唤醒,其他唤醒交给被唤醒的offer线程来实现级联唤醒。
- 针对offer线程:当tailWait中的offer线程被唤醒之后,判断队列是不是还有多的空位,如果空位超过1个,那么就自己来唤醒在tailWait中等待入队的线程。
完整代码:
package com.hcx.algorithm.queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Title: SingleLockBlockingQueue.java
* @Package com.hcx.algorithm.queue
* @Description: 双锁实现阻塞队列
* 一把锁保护 tail,一把锁保护 head,提升性能
* 在唤醒的时候进行了优化,只有当队列处于临界条件的时候才由另一个线程唤醒,否则自己唤醒(级联通知)
* @Author: hongcaixia
* @Date: 2025/1/13 15:23
* @Version V1.0
*/
public class DoubleLockBlockingQueue<E> implements BlockingQueue<E> {
E[] array;
// 队列大小
AtomicInteger size = new AtomicInteger(0);
// 头指针
int head = 0;
// 尾指针
int tail = 0;
// 出队锁
ReentrantLock headLock = new ReentrantLock();
// 入队锁
ReentrantLock tailLock = new ReentrantLock();
// 当队列满时,线程先进入等待队列阻塞(此时会释放锁),等到有元素出队之后,再被其他线程唤醒,然后重新执行元素入队操作。
Condition tailWait = tailLock.newCondition();
// 当队列空时,线程需要进入等待队列(此时会释放锁),需要等到有元素入队之后,再被其他线程唤醒,然后执行元素出队操作。
Condition headWait = headLock.newCondition();
public DoubleLockBlockingQueue(int capacity) {
array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
// 队列当前大小
int beforeAdd;
tailLock.lockInterruptibly();
try {
// tailWait中的线程被唤醒之后,会与其他线程一起争抢锁,如果抢不到,那么就还需要再次进入等待队列,所以需要使用while循环判断,确保队列不是满的才往下执行
// 唤醒后应该重新检查条件
while (isFull()) {
tailWait.await();
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
// 返回增加之前的值
beforeAdd = size.getAndIncrement();
if (beforeAdd + 1 < array.length) {
// 说明队列还没有满,唤醒阻塞的offer线程
tailWait.signal();
}
} finally {
tailLock.unlock();
}
// 再offer这里加了锁去唤醒非空的poll线程,会导致正常的poll线程也阻塞,抢不到锁,所以需要减少唤醒时的加锁次数
/**
* 当队列原本为空,刚好入队了一个元素时,此时执行唤醒操作,对于剩余的其他阻塞了的poll线程,交给poll线程自己来唤醒
*/
if (beforeAdd == 0) {
headLock.lock();
try{
// 队列从空变化到不空,会唤醒一个等待的 poll 线程,为了避免死锁,需要等tailLock释放了之后才开始加(条件变量的 await(), signal() 等方法需要先获得与之关联的锁)
headWait.signal();
}finally {
headLock.unlock();
}
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
int beforeAdd;
tailLock.lockInterruptibly();
try {
long time = TimeUnit.MILLISECONDS.toNanos(timeout);
// 唤醒后应该重新检查条件
while (isFull()) {
if (time <= 0) {
return;
}
// 重新更新等待时间
time = tailWait.awaitNanos(time);
}
array[tail] = e;
tail++;
if (tail == array.length) {
tail = 0;
}
beforeAdd = size.getAndIncrement();
if (beforeAdd + 1 < array.length) {
// 如果入队之前+1 还有剩余空间,说明队列中元素还有剩余,可以唤醒阻塞的polll线程
tailWait.signal();
}
} finally {
tailLock.unlock();
}
// 如果入队之前队列时空的,那么此时只有一个元素,就唤醒阻塞的poll线程
if (beforeAdd == 0) {
headLock.lock();
try {
// 此时入队成功 唤醒在headWait中阻塞的线程(headWait中的线程是因为队列没有元素所以阻塞,此时队列中成功入队了一个元素,唤醒线程去取队列的元素)
headWait.signal();
} finally {
headLock.unlock();
}
}
}
@Override
public E poll() throws InterruptedException {
E e;
// size减之前的值
int beforeDecrement;
headLock.lockInterruptibly();
try {
// 唤醒后应该重新检查条件
while (isEmpty()) {
headWait.await();
}
e = array[head];
array[head] = null;
head++;
if (head == array.length) {
head = 0;
}
beforeDecrement = size.getAndDecrement();
if (beforeDecrement > 1) {
// 说明队列中还有剩余元素(并非当前poll完了之后就为空了),所以还可以唤醒阻塞的poll线程
headWait.signal();
}
} finally {
headLock.unlock();
}
// 队列刚从满变成不满的时候,去唤醒等待队列中的offer线程,剩余阻塞的offer线程由offer线程自己唤醒
if (beforeDecrement == array.length) {
tailLock.lock();
try {
// 队列从满变化到不满,唤醒等待队列中的offer线程
tailWait.signal();
} finally {
tailLock.unlock();
}
}
return e;
}
@Override
public boolean isEmpty() {
return size.get() == 0;
}
@Override
public boolean isFull() {
return size.get() == array.length;
}
}