在 Java 中,我们常常开启多个线程来提高运行效率。但是有时候我们需要这些线程的返回值。应该如何获取线程的返回值呢?
我现在有10万个整数,我需要开5个线程来找它们的最大值。每个线程处理2万个整数,然后返回这2万个整数的最大值。最终我们再找出整体的最大值。
1. 使用线程安全的共享变量
我们使用一个类似于 LinkedList 的共享变量来存放这5个线程各自找到的最大值。但是 LinkedList 是线程不安全的,所以可以换为 ConcurrentLinkedQueue 。
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MaxValueFinder {
public static void main(String[] args) throws InterruptedException {
int[] nums = generateNums(100000);
ConcurrentLinkedQueue<Integer> resultQueue = new ConcurrentLinkedQueue<>();
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
int startIndex = i * (nums.length / threads.length);
int endIndex = (i + 1) * (nums.length / threads.length);
threads[i] = new MaxValueThread(nums, startIndex, endIndex, resultQueue);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
int maxValue = resultQueue.poll();
while (!resultQueue.isEmpty()) {
int value = resultQueue.poll();
if (value > maxValue) {
maxValue = value;
}
}
System.out.println("Max value: " + maxValue);
}
private static int[] generateNums(int n) {
int[] nums = new int[n];
Random random = new Random();
for (int i = 0; i < n; i++) {
nums[i] = random.nextInt();
}
return nums;
}
static class MaxValueThread extends Thread {
private int[] nums;
private int startIndex;
private int endIndex;
private ConcurrentLinkedQueue<Integer> resultQueue;
MaxValueThread(int[] nums, int startIndex, int endIndex, ConcurrentLinkedQueue<Integer> resultQueue) {
this.nums = nums;
this.startIndex = startIndex;
this.endIndex = endIndex;
this.resultQueue = resultQueue;
}
@Override
public void run() {
int maxValue = nums[startIndex];
for (int i = startIndex + 1; i < endIndex; i++) {
if (nums[i] > maxValue) {
maxValue = nums[i];
}
}
resultQueue.add(maxValue);
}
}
}
在这个示例中,我们创建了一个ConcurrentLinkedQueue来存储线程的计算结果。在每个线程的run方法中,线程会计算它被分配的一部分数组元素的最大值,并将结果添加到ConcurrentLinkedQueue中。在主线程中,我们从ConcurrentLinkedQueue中取出所有结果,找出最大值并输出。
需要注意的是,ConcurrentLinkedQueue 虽然是线程安全的,但是并不能保证它的顺序,因此在处理结果时需要考虑顺序的问题。在这个示例中,我们只需要找出最大值,因此顺序不影响结果。如果需要按照顺序处理结果,可以考虑使用其他的线程安全集合。
2. 使用Future+Callable
import java.util.Random;
import java.util.concurrent.*;
public class MaxFinder {
private static final int THREAD_COUNT = 5;
private static final int ARRAY_SIZE = 100000;
private static final int SUB_ARRAY_SIZE = ARRAY_SIZE / THREAD_COUNT;
private static final int MAX_VALUE = 1000000;
public static void main(String[] args) {
int[] arr = generateRandomArray(ARRAY_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
Future<Integer>[] futures = new Future[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
int startIndex = i * SUB_ARRAY_SIZE;
int endIndex = (i + 1) * SUB_ARRAY_SIZE;
futures[i] = executorService.submit(new MaxFinderTask(arr, startIndex, endIndex));
}
int max = Integer.MIN_VALUE;
for (int i = 0; i < THREAD_COUNT; i++) {
try {
int result = futures[i].get();
if (result > max) {
max = result;
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("Max value: " + max);
executorService.shutdown();
}
private static int[] generateRandomArray(int size) {
int[] arr = new int[size];
Random random = new Random();
for (int i = 0; i < size; i++) {
arr[i] = random.nextInt(MAX_VALUE);
}
return arr;
}
private static class MaxFinderTask implements Callable<Integer> {
private final int[] arr;
private final int startIndex;
private final int endIndex;
public MaxFinderTask(int[] arr, int startIndex, int endIndex) {
this.arr = arr;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
public Integer call() throws Exception {
int max = Integer.MIN_VALUE;
for (int i = startIndex; i < endIndex; i++) {
if (arr[i] > max) {
max = arr[i];
}
}
return max;
}
}
}
这个程序中,我们使用ExecutorService创建了一个线程池,然后使用Future来获取每个线程的结果。在主线程中,我们使用Future.get()方法来等待每个线程的结果,并使用它们来找到整个数组的最大值。注意,在这个例子中,我们使用Callable接口来创建每个线程的任务,并在任务的call()方法中返回结果。
3. 使用Stream
Stream(流)有点类似于 Golang 的管道,非常方便。
Java 8引入了Stream API,可以对集合和数组等数据源进行流式操作,其中包括并行处理数据源的能力。下面是一个使用Stream来实现任务的示例代码:
int[] nums = ...; // 10万个整数
int parallelism = 5; // 并行度
int max = Arrays.stream(nums)
.parallel()
.unordered()
.mapToInt(Integer::intValue)
.limit((long) Math.ceil(nums.length / (double) parallelism))
.max()
.getAsInt();
这个示例代码首先将整数数组转换为流,然后使用parallel()方法启用并行流处理,使用unordered()方法告诉Stream不必保证元素顺序,使用mapToInt(Integer::intValue)方法将流中的元素转换为整数类型,使用limit()方法限制每个线程需要处理的元素个数,最后使用max()方法找到流中的最大值并返回。
这个示例代码使用了Java 8的lambda表达式和方法引用,可以让代码更加简洁和易读。