前言:
阻塞队列是jdk5 java.util.concurrent包中五个模块之一,一般用于消费者、生产者的情景,与一般队列的不同就是阻塞队列内部加入了Lock和Condition可以在某个线程试图向队列添加元素而该队列已经满时,或者从队列移出元素而队列为空时,自动阻塞该线程。
阻塞队列的作用和使用场景
所以在协调多个线程之间的合作时,阻塞队列是一个有用的工具。生产线程可以周期性地将中间结果存储在阻塞队列中,而消费线程移除中间结果并加以逻辑处理。阻塞队列最大的作用就是可以自动平衡负载,因为如果生产者线程集运行得比消费者线程集慢,消费者线程就会发生阻塞,反之也是一样。假设一个场景:我们写一个监控文件变化的程序,开一个线程获取变化了的文件名,拿到之后我们会做解析,查找数据库等等其他操作。如果我们对这两个操作分开统计它们的处理时间,会发现后面的解析需要的时间远远大于前面得到文件名的时间。如果这个线程每一秒轮询一次,而解析查询操作有可能会超出一秒,这样就会让大量的变化文件来不及处理。这个使用就需要用到阻塞线程了,可以使用生产线程来进行文件变化检测,而多个消费线程来进行文件处理操作。
JDK 7 提供了7个阻塞队列
**1、ArrayBlockingQueue **数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。
2、LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
此队列按照先出先进的原则对元素进行排序
3、PriorityBlockingQueue 支持优先级的无界阻塞队列
4、DelayQueue 支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素
5、SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。
6、LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法
transfer方法
如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法
用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
7、LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。
阻塞队列的方法
方法 | 正常动作 | 特殊情况下动作 |
---|---|---|
add | 添加一个元素 | 如果队列满,则抛出IllegalStateException异常 |
offer | 添加一个元素并返回bollean | 如果队列满,返回false |
put | 添加一个元素 | 如果队列满,则阻塞该线程 |
element | 返回队列的头元素 | 如果队列为空,则抛出NoSuchElementException |
peek | 返回队列的头元素 | 如果队列为空,则返回null |
poll | 移除并返回队列的头元素 | 如果队列为空,返回null |
remove | 移除并返回队列的头元素 | 如果队列为空,则抛出NoSuchElementException |
take | 移除并返回队列的头元素 | 如果队列为空 ,则阻塞该线程 |
阻塞队列测试使用
public class BlockQueueTest {
private static BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
/**
* 消费者线程,每一秒消费一次
*/
static class Consumer implements Runnable{
BlockingQueue queue;
public Consumer(BlockingQueue queue){
this.queue = queue;
}
public void run() {
System.out.println("开始消费----");
while (true){
try {
Object poll = queue.take();
Thread.sleep(500);
System.out.println(Thread.currentThread().getName()+"消费"+(String)poll);
} catch (InterruptedException e) {
//被中断,捕获后清除中断状态
Thread.currentThread().interrupt();
}
}
}
}
static class Producer implements Runnable{
BlockingQueue queue;
Producer(BlockingQueue queue){
this.queue = queue;
}
public void run() {
System.out.println("开始生产-------");
while (true){
try {
queue.put("test");
System.out.println("生产线程生产test");
} catch (InterruptedException e) {
//捕获中断,清除中断状态
Thread.currentThread().interrupt();
}
}
}
}
public static void main(String[] args) {
//开启生产线程
new Thread(new Producer(blockingQueue)).start();
//开启三个消费线程
new Thread(new Consumer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
new Thread(new Consumer(blockingQueue)).start();
}
参考
- 《Java核心技术》