一、前言
最近一直在研究Java并发‘’相关的知识,从Thread到ThreadPoolExecutor,从synchronize到AQS。此次研究一下 FutureTask以及其相关的类的工作方式,与FutureTask相关的类包括:Callable、Runnable、Future和ThreadPoolExecutor等,我们一点一点的来慢慢引入。
二、Callable 和 Runnable
2.1 Runnable
Runnable 是一个接口,它只有一个 run 方法。关于这个接口存在的意义,有一篇文章中说的非常贴切。
创建线程最重要的是传递一个run()方法, 这个run方法定义了这个线程要做什么事情, 它被抽象成了Runnable接口。
这句话可以作为理解接口的一种角度,接口的本质是定义规则,以及这些规则包含哪些动作。对于 Runnable 接口,它就定义了一个规则 —— 可运行。
public interface Runnable {
public abstract void run();
}
从方法中可以看出,Runnable 中的run方法没有返回值,也无法抛出异常。这就意味着,我们用Runnable,就只能去执行一个没有反馈的任务,任务是否执行完毕,执行过程中是否存在异常,主线程无法感知到,全凭 Runnable 自己去处理。如果我们遇到使用多线程去做结果归纳的需求该怎么办?我们希望主线程在子线程执行完毕后拿到子线程的计算结果,进行归纳。
2.2 Callable
Callable 也是一个接口,这个接口所定义的规则是 call 方法。
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
对比Callable接口与Runnable接口, 我们可以发现它们最大的不同点在于:
- Callable有返回值
- Callable可以抛出异常
关于有返回值这点,我们并不意外,因为这就是我们的需求,call方法的返回值类型采用的泛型,该类型是我们在创建Callable对象的时候指定的。
除了有返回值外,相较于Runnable接口,Callable还可以抛出异常,这点看上去好像没啥特别的,但是却有大用处——这意味着如果在任务执行过程中发生了异常,我们可以将它向上抛出给任务的调用者来妥善处理,我们甚至可以利用这个特性来中断一个任务的执行。而Runnable接口的run方法不能抛出异常,只能在方法内部catch住处理,丧失了一定的灵活性。
好了,既然Callable可以返回运算结果,我们来尝试获取一下:
public static void main(String[] args) {
Callable<String> myCallable = () -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "done";
};
System.out.println("callable 开始执行");
try {
String result = myCallable.call();
System.out.println("Callable 执行的结果是: " + result);
} catch (Exception e) {
System.out.println("There is a exception.");
}
}
我们成功的调用了call方法,并且拿到了执行结束的结果“done”,看样子像是满足了前面的需求。但是还存在几个问题:
- call方法是在当前线程中直接调用的, 无法利用多线程。
- call方法可能是一个特别耗时的操作, 这将导致程序停在myCallable.call()调用处, 无法继续运行, 直到call方法返回。
- 如果call方法始终不返回, 我们没办法中断它的运行。
实际上这段代码自始至终只有主线程一条线程去完成这件事的。因此, 理想的操作应当是, 我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务, 这种思路的实现就是Future接口。
三、Future
Future接口被设计用来代表一个异步操作的执行结果。你可以用它来获取一个操作的执行结果、取消一个操作、判断一个操作是否已经完成或者是否被取消。
public interface Future<V> {
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
Future接口一共定义了5个方法:
- get():该方法用来获取执行结果, 如果任务还在执行中, 就阻塞等待;
- get(long timeout, TimeUnit unit):该方法同get方法类似, 所不同的是, 它最多等待指定的时间, 如果指定时间内任务没有完成, 则会抛出TimeoutException异常;
- cancel(boolean mayInterruptIfRunning):该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功.
- isCancelled():该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被cancel掉了, 则返回true
- isDone():如果一个任务已经结束, 则返回true。注意, 这里的任务结束包含了以下三种情况:
- 任务正常执行完毕
- 任务抛出了异常
- 任务已经被取消
3.1 cancel 方法需要注意的地方
关于cancel方法,这里要补充说几点:
首先有以下三种情况之一的,cancel操作一定是失败的:
任务已经执行完成了
任务已经被取消过了
任务因为某种原因不能被取消
其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态:如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
-
如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
- 如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
- 如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完
总结来说,Future提供了三种功能:
- 判断任务是否完成;
- 能够中断任务;
- 能够获取任务执行结果。
我们回顾一下我们的需求:
我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务。
我们需要将 Callable 交给线程池,让线程池去开启一条子线程去执行 call方法,并借助 Future 去操作任务。我们看一下具体用法:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
200,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(5),
new ThreadFactoryBuilder().build(),
new ThreadPoolExecutor.AbortPolicy());
Map<String, String> map = Maps.newHashMap();
Future<Map<String, String>> future1 = executor.submit(() -> {
Thread.sleep(3000);
Map<String, String> map1 = Maps.newHashMap();
map1.put("name", "zhangsan");
System.out.println("子线程执行完毕");
return map1;
});
executor.shutdown();
System.out.println("---main over---");
map.putAll(future1.get());
System.out.println("result: " + map);
}
执行中取消:
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
200,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(5),
new ThreadFactoryBuilder().build(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
cancelWhenRunning();
}
private static void cancelWhenRunning() throws ExecutionException, InterruptedException {
Map<String, String> map = Maps.newHashMap();
Future<Map<String, String>> future1 = executor.submit(() -> {
Thread.sleep(3000);
Map<String, String> map1 = Maps.newHashMap();
map1.put("name", "zhangsan");
System.out.println("子线程执行完毕");
return map1;
});
executor.shutdown();
System.out.println("---main over---");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
future1.cancel(true);
map.putAll(future1.get());
System.out.println("result: " + map);
}
执行结果:
---main over---
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.cancelWhenRunning(FutrueApp2.java:58)
at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.main(FutrueApp2.java:21)
mayInterruptIfRunning 为 true,子线程没有执行完成,就被取消掉了。取消之后在调用 get 方法则会抛出异常。
如果mayInterruptIfRunning 为 false,可以发现,取消之后在调用 get 方法依然会抛出异常,但是子线程可以执行结束。
---main over---
Exception in thread "main" java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.cancelWhenRunning(FutrueApp2.java:58)
at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.main(FutrueApp2.java:21)
子线程执行完毕
四、FutureTask
FutureTask是Future接口的一个实现类。
public class FutureTask<V> implements RunnableFuture<V>
FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask实现了该接口,也就是相当于它同时实现了Runnable接口和Future接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
有的同学可能会对这个接口产生疑惑,既然已经继承了Runnable,该接口自然就继承了run方法,为什么要在该接口的内部再写一个run方法?
单纯从理论上来说,这里确实是没有必要的,再多写一遍,我觉得大概就是为了看上去直观一点,便于文档或者UML图展示。
实际上 FutureTask 与 Future 的用法区别不大:
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
200,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(5),
new ThreadFactoryBuilder().build(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
futureTask();
}
private static void futureTask() throws ExecutionException, InterruptedException {
Map<String, String> map = Maps.newHashMap();
FutureTask<Map<String, String>> mapFutureTask = new FutureTask<>(() -> {
Thread.sleep(3000);
Map<String, String> map1 = Maps.newHashMap();
map1.put("name", "zhangsan");
System.out.println("子线程执行完毕");
return map1;
});
executor.submit(mapFutureTask);
executor.shutdown();
System.out.println("---main over---");
map.putAll(mapFutureTask.get());
System.out.println("result: " + map);
}
执行结果与 Future 无异。通过看demo可发现两点不同:
- 在使用 Future 时,我们是直接将 Callable 交给线程池去执行 submit 方法,而且要获取 submit 的返回值 Future
- 在使用 FutureTask 时,我们先将 Callable 交给 FutureTask,然后再将 FutureTask 交给线程池去执行 submit 方法,此处就不需要 submit 的返回值
五、总结
本文层层介绍 Callable、Future、FutureTask的基本特征和使用方法。下一篇将跟随 FutureTask的源码来探究其工作原理。
PS:当然,我自己是无法写出很浅显易懂富有条理的文章,所以当然有参考~
https://www.cnblogs.com/dolphin0520/p/3949310.html
https://segmentfault.com/a/1190000016542779#item-2-1
这两篇文章讲的非常好,此处推荐阅读原文。
(侵删)
如果有解释不清或者缺失的地方,还望在下方留言,大家一起学习,一起交流,一起进步。