1. 概念和特性
很多线程问题可以使用一个或多个队列优雅而安全地解决。
比如说,生产者线程向队列插入元素,消费者线程负责获取元素。
利用这种方式,我们可以安全地从一个线程向另一个线程传递参数。
阻塞队列(BlockingQueue)是协调多个线程之间合作的有用工具。
当试图向阻塞队列添加元素而队列已满,或者从队列移出元素而队列为空的时候,将导致线程阻塞。
阻塞队列的应用场景:
工作线程可以周期性地将中间结果存储在阻塞队列中。
其它工作线程移除中间结果,并进一步进行修改。
这样,队列可以自动地平衡负载。
2. BlockingQueue的方法
方法 | 动作 | 特殊情况 |
---|---|---|
add | 添加一个元素 | 如果队列满,抛出IllegalStateException |
element | 返回队头元素 | 如果对列空,抛出NoSuchElementException |
offer | 添加一个元素并返回true | 如果队列满,则返回false |
peek | 返回队头元素 | 如果队列空,则返回null |
poll | 移除并返回队头元素 | 如果队列空,则返回null |
put | 添加一个元素 | 如果队列满,则阻塞 |
remove | 移除并返回队头元素 | 如果对列空,抛出NoSuchElementException |
take | 移除并返回队头元素 | 如果对列空,则阻塞 |
offer和poll方法还可以有超时时间的参数
// 在100毫秒的时间内向队尾插入一个元素;超时返回false
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
// 在100毫秒的时间内移除队头元素;超时返回null
Object head = q.poll(100, TimeUnit.MILLISECONDS);
3. Java并发包的阻塞队列实现
java.util.concurrent包提供了阻塞队列的几种实现。
类名 | 解释 |
---|---|
LinkedBlockingQueue | 双端队列,容量没有上界,但是也可以选择指定一个最大容量。 |
ArrayBlockingQueue | 在构造是需要指定容量,并且有一个可选的参数来指定是否需要公平性。如果设置了公平性,那么等待时间最长的线程会优先得到处理。公平性会降低性能。 |
PriorityBlockingQueue | 优先队列,元素按照它们的优先级顺序移除。这个队列没有容量上限。但是队列为空获取元素也会造成阻塞。 |
4. 程序实战
SearchFileByBlockingQueueDemo.java
package zeus.playground.concurrent.queue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SearchFileByBlockingQueueDemo {
private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final Path DUMMY = Path.of("");
private static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
public static void main(String[] args) {
try (var in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /opt/jdk-9-src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
Runnable enumerator = () -> {
try {
enumerate(Path.of(directory));
queue.put(DUMMY);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
// 工作线程负责向队列添加元素
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
var done = false;
while (!done) {
Path file = queue.take();
if (file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
// 工作线程负责从队列移除元素
new Thread(searcher).start();
}
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories.
* @param directory the directory in which to start
*/
public static void enumerate(Path directory) throws IOException, InterruptedException {
try (Stream<Path> children = Files.list(directory)) {
for (Path child : children.collect(Collectors.toList())) {
if (Files.isDirectory(child)) {
enumerate(child);
} else {
queue.put(child);
}
}
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
*
* @param file the file to search
* @param keyword the keyword to search for
*/
public static void search(Path file, String keyword) throws IOException {
try (var in = new Scanner(file, StandardCharsets.UTF_8)) {
int lineNumber = 0;
while (in.hasNextLine()) {
lineNumber++;
String line = in.nextLine();
if (line.contains(keyword)) {
System.out.printf("%s:%d:%s%n", file, lineNumber, line);
}
}
}
}
}
上面程序通过阻塞队列的方式搜索指定文件夹下的所有文件是否有某个关键字。
程序通过一批工作线程来完成:
一个生产者工作线程通过调用递归方法来遍历文件夹,并把文件路劲塞入阻塞队列。
一批消费者工作线程通过从阻塞队列中队头挨个取文件路径,再根据文件路径搜索指定文件是否含有某个关键字。