java.util.concurrent
包中的Java BlockingQueue
接口表示一个线程可以安全放入以及从中获取实例的队列。在本文中,我将向你展示如何使用BlockingQueue
。
BlockingQueue 使用
一个BlockingQueue
通常用于在线程上生成对象,另一个线程消耗对象。这是一个说明这个原则的图表:
生产线程将一直生成新对象并将它们插入队列,直到达到队列的容量上限。如果阻塞队列达到其上限,则在尝试插入新对象时会阻止生产线程。它将一直被阻塞,直到消费线程将一个对象从队列中取出。
消费线程不断将对象从阻塞队列中取出并处理它们。如果消费线程试图将对象从空队列中取出实例,那么消费线程将被阻塞,直到生产线程向队列放入一个对象。
BlockingQueue 方法
BlockingQueue
有4组不同的方法用于插入,删除和检查队列中的元素。当不能立即执行所请求的操作时,每组方法的行为会不同。这是一个方法表:
抛出异常 | 返回特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(o) |
offer(o) |
put(o) |
offer(o, timeout, timeunit) |
删除 | remove(o) |
poll() |
take() |
poll(timeout, timeunit) |
访问 | element() |
peek() |
这4种不同的行为意味着:
-
抛出异常:
如果请求的操作现在无法完成,则抛出异常。 -
特殊值:
如果请求的操作现在无法完成,则返回特殊值(一般为 true / false). -
阻塞:
如果请求的操作现在无法完成,则方法调用将阻塞,直到操作能够进行。 -
超时:
如果请求的操作现在无法完成,则方法调用将阻塞直到它能够进行,但等待不超过给定的超时。返回一个特殊值,告知操作是否成功(通常为true / false)
无法插入null
到BlockingQueue
中。如果你尝试插入null
, BlockingQueue
则会抛出一个NullPointerException
异常。
你可以访问BlockingQueue
内的所有元素,而不仅仅是开头和结尾的元素。例如,假设你已将一个对象入队等待处理,但你的应用程序决定取消它。你可以调用remove(o)
这样的操作来删除队列中的特定对象。但是,这是个效率很低的操作,所以除非你真的需要,否则你不应该使用Collection
中的这些方法。
BlockingQueue 实现
由于BlockingQueue
是一个接口,你需要使用它的一个实现来使用它。java.util.concurrent
包中具有以下实现BlockingQueue
接口的类:
Java BlockingQueue 示例
这是一个Java BlockingQueue
示例。该示例使用实现BlockingQueue
接口的ArrayBlockingQueue
类。
首先, BlockingQueueExample
类在不同的线程中启动Producer
和Consumer
。Producer
将一个字符串插入共享的BlockingQueue
,而Consumer
使用它们。
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
这是Producer
类。注意每次put()
调用之间它都会睡一秒钟。这将导致Consumer
阻塞,为了等待获取队列中的对象。
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这是Consumer
类。它从队列中取出对象,然后将它们打印到System.out
。
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
下面是一个测试:
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingExample {
@Test
public void test() throws Exception {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
Thread thread1 = new Thread(producer);
Thread thread2 = new Thread(consumer);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
private static class Producer implements Runnable {
private BlockingQueue<String> queue;
Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class Consumer implements Runnable {
private BlockingQueue<String> queue = null;
Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果如下:
可以看出执行了2s 15ms才执行完
下面是BlockingQueue
的源码。注意每个方法文档中提及的异常,同时此接口还提供了两个方法int drainTo(Collection<? super E> c)
,int drainTo(Collection<? super E> c, int maxElements)
,这两个方式的作用是将队列中的元素增加到参数指定的集合中。
public interface BlockingQueue<E> extends Queue<E> {
/**
* 将指定值插入到队列中,如果没有超出容量限制那么立刻完成并返回
* {@code true} 成功
* {@code IllegalStateException} 如果现在没有足够容量
* 当使用一个容量限制的队列,使用{@link #offer(Object) offer}更好
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if the element cannot be added at this
* time due to capacity restrictions
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean add(E e);
/**
* Inserts the specified element into this queue if it is possible to do
* so immediately without violating capacity restrictions, returning
* {@code true} upon success and {@code false} if no space is currently
* available. When using a capacity-restricted queue, this method is
* generally preferable to {@link #add}, which can fail to insert an
* element only by throwing an exception.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else
* {@code false}
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e);
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;
/**
* Inserts the specified element into this queue, waiting up to the
* specified wait time if necessary for space to become available.
*
* @param e the element to add
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking, or {@code Integer.MAX_VALUE} if there is no intrinsic
* limit.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*
* @return the remaining capacity
*/
int remainingCapacity();
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
*/
boolean remove(Object o);
/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
* @throws ClassCastException if the class of the specified element
* is incompatible with this queue
* (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
* @throws NullPointerException if the specified element is null
* (<a href="{@docRoot}/java/util/Collection.html#optional-restrictions">optional</a>)
*/
boolean contains(Object o);
/**
* 从队列中删除所有能访问的元素并且将他们增加到指定的集合中。
* 这个操作可能比重复的调用poll方法效率更高。当尝试将元素增加
* 到集合中时如果失败啊,那么可能队列和集合中都包含或都不包含
* 此元素。如果指定的集合是它自己,那么抛出一个IllegalArgumentException。
* 当此操作进行时,如果指定的集合被修改,那么这个操作将是未定义的。
*
* @param c the collection to transfer elements into
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
int drainTo(Collection<? super E> c);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection. A failure
* encountered while attempting to add elements to
* collection {@code c} may result in elements being in neither,
* either or both collections when the associated exception is
* thrown. Attempts to drain a queue to itself result in
* {@code IllegalArgumentException}. Further, the behavior of
* this operation is undefined if the specified collection is
* modified while the operation is in progress.
*
* @param c the collection to transfer elements into
* @param maxElements the maximum number of elements to transfer
* @return the number of elements transferred
* @throws UnsupportedOperationException if addition of elements
* is not supported by the specified collection
* @throws ClassCastException if the class of an element of this queue
* prevents it from being added to the specified collection
* @throws NullPointerException if the specified collection is null
* @throws IllegalArgumentException if the specified collection is this
* queue, or some property of an element of this queue prevents
* it from being added to the specified collection
*/
int drainTo(Collection<? super E> c, int maxElements);
}