在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承Thread类还是实现Runnable接口,都无法保证获取到之前的执行结果。但是通过实现Callable接口,并用Future可以来接收多线程执行结果。
英文含义:
callable 可收回的
future 未来
Executor [ɪɡˈzekjətər] (一哥在kei特) 执行器
Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加callable以便在任务执行成功或失败后作出相应的操作。
线程池提交实现Callable接口的线程会返回一个Future对象。
Java并发编程:Callable、Future和FutureTask
1. Callable和Runnable
Runnable是一个接口,它里面只声明了一个run方法。
public interface Runnable {
public abstract void run();
}
但是run()方法返回值是void类型,所以在执行任务之后无法返回任何结果。
而Callable位于java.util.concurrent(并发包)包下,他也是一个接口,它里面也只声明了一个方法。
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
并且,这个接口是一个泛型接口,call方法返回类型就是传递进来的类型。
2. Future
一般情况下,Callable配合ExecutorSevice来使用,获取到的返回值就是Future类型
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,get方法会阻塞直到任务的完成。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
cancel(mayInterruptIfRunning) 取消正在运行时的线程。需要注意的时,本质上是依赖线程进程中的(sleep,wait,join-->本质上均是NEW状态)的interrupt方法抛出异常来进行线程的中断的。详情请参考 FutureTask的cancel方法真的能终止正在运行的线程吗?
isCanselled方法表示任务是否被取消成功,如果任务正常完成前被取消成功,则返回true。
isDone 方法表示任务是否已经完成,若任务完成,则返回true。
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
get(long timeout,TimeUnit unit)用来获取执行结果,如果在指定时间内,还未获取到结果,那么会抛出
java.util.concurrent.TimeoutException异常。
Future提供了三种功能:
- 判断任务是否完成;
- 能够中断任务;
- 能够获取到任务执行结果;
3. FutureTask

Future同时实现了runnable接口和Future接口。故它既可以作为Runnable接口被线程执行,又可以作为Future得到Callable的返回值。
1. FutureTask的参数:Callable<V>
@Test
public void testTask() throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
//线程池提交任务
FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for (int i = 0; i < 100; i++)
sum += i;
return sum;
});
//因为FutureTask实现了Runnable接口,故可以使用Thread启用
Thread thread=new Thread(futureTask);
thread.start();
//也可以使用线程池的方式使用
executorService.submit(futureTask);
executorService.shutdown(); //关闭线程池
Thread.sleep(1000);
System.out.println("task运行结果"+futureTask.get());
System.out.println("所有任务执行完毕");
}
}
如果为了取消性而使用Future但又不提供可用的结果,则可以声明Future<?>形式参数,并返回null作为底层任务结果~
4. 项目实战,策略模式抽取公共逻辑
目的是为了保证输入的原始数据和输出的数据保存强一致。
/**
* 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
*
* @param sources 源数据
* @param function 策略方法
* @param executor 线程池
* @param <T> 请求数据的泛型
* @param <R> 响应数据的泛型
* @return 并发执行的最终结果
*/
public static <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
Function<T, R> function,
ExecutorService executor) {
return futureAllFunction(sources, function, null, executor);
}
/**
*
* 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
*
* @param sources 源数据
* @param function 策略方法
* @param timeout 失效时间,为空即不设置
* @param executor 线程池
* @param <T> 请求数据的泛型
* @param <R> 响应数据的泛型
* @return 并发执行的最终结果
*/
public static <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
Function<T, R> function,
Long timeout,
ExecutorService executor) {
/**
* 获取FutureTask任务,并放入到线程池中。
*/
List<FutureTask<BatchFutureResult<T, R>>> futureTasks = sources.stream().map(t -> {
FutureTask<BatchFutureResult<T, R>> resultFutureTask = new FutureTask<>(() -> {
BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
R apply = null;
Integer status = StatusEnum.SUCCESS.getStatus();
try {
apply = function.apply(t);
} catch (Exception e) {
log.error("批量任务异常", e);
batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
status = StatusEnum.ERROR.getStatus();
}
batchFutureResult.setSource(t);
batchFutureResult.setResult(apply);
batchFutureResult.setStatus(status);
return batchFutureResult;
});
//放入线程池等待处理
executor.submit(resultFutureTask);
return resultFutureTask;
}).collect(Collectors.toList());
List<BatchFutureResult<T, R>> results = new ArrayList<>();
//循环获取结果
for (int i = 0; i < sources.size(); i++) {
//获取数据
T source = sources.get(i);
FutureTask<BatchFutureResult<T, R>> futureTask = futureTasks.get(i);
//抓取结果
BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
try {
if (timeout == null) {
batchFutureResult = futureTask.get();
} else {
batchFutureResult = futureTask.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("", e);
batchFutureResult.setStatus(StatusEnum.ERROR.getStatus());
batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
batchFutureResult.setSource(source);
}
//放入数据
results.add(batchFutureResult);
}
return results;
}
@Data
public class BatchFutureResult<T, R> {
/**
* 是否成功
*/
private Integer status;
/**
* 异常时的原因
*/
private String message;
/**
* 原数据
*/
private T source;
/**
* 结果数据
*/
private R result;
}
@Getter
public enum StatusEnum {
SUCCESS(1, "成功"),
ERROR(2, "失败");
StatusEnum(Integer status, String desc) {
this.status = status;
this.desc = desc;
}
private Integer status;
private String desc;
}