在Java中,线程(或者任务?)分为两类。
- Runnable 无返回值,执行特定的逻辑。包入Thread或者线程池使用
- Callable 用户自定义返回值。与线程池结合使用。异步化的 java.util.function.Supplier
FutureTask 其实与Callable是息息相关的,在我们submit一个任务后,线程池会返回一个FutureTask实例,用于同步获取异步执行任务的返回值以及其状态。
这里我们先考虑使用Runnable自行实现一个带返回值的任务。
v1.0
public class FutureTask<T> implements Runnable {
private volatile T outcome;
private final Object lock = new Object();
private Callable callable;
@Override
public void run() {
// biz operation
synchronized (lock) {
outcome = callable.call();
lock.notifyAll();
}
}
public T get() throws Exception {
synchronized (lock) {
if (outcome == null) {
lock.wait();
}
return outcome;
}
}
// 这里的实现非常简易 0.0 便于理解
public static void main(String[] args) throws Exception {
Callable<String> callable = new Callable<String>() {
@Override
protected String call() {
return "Hello world.";
}
};
new Thread(callable).start();
callable.get();
}
}
这里我们发现好像有点问题,线程任务是能跑起来了,但是好像无法取消?
v2.0
那我们加上取消功能
public class Callable<T> implements Runnable {
private volatile T outcome;
private final Object lock = new Object();
private Thread runner;
private Callable callable;
@Override
public void run() {
synchronized (lock) {
if (runner != null)
return;
else
runner = Thread.currentThread();
}
// biz operation
synchronized (lock) {
outcome = callable.call();
lock.notifyAll();
}
}
public T get() throws Exception {
synchronized (lock) {
if (outcome == null) {
lock.wait();
}
return outcome;
}
}
public void cancel() {
if (runner != null && !runner.isInterrupted() && outcome == null)
runner.interrupt();
}
}
现在已经支持了取消,但好像还缺少状态管理?
v3.0
public abstract class Callable<T> implements Runnable {
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int CANCELLED = 2;
private final Object lock = new Object();
private int state = NEW;
private volatile T outcome;
private Thread runner;
private Callable callable;
@Override
public void run() {
synchronized (lock) {
if (runner != null)
return;
else
runner = Thread.currentThread();
}
// biz operation
synchronized (lock) {
outcome = callable.call();
lock.notifyAll();
state = COMPLETING;
}
}
public int getState() {
return state;
}
public T get() throws Exception {
synchronized (lock) {
if (outcome == null) {
lock.wait();
}
return outcome;
}
}
public void cancel() {
synchronized (lock) {
if (outcome == null && state == COMPLETING) {
state = CANCELLED;
}
if (runner != null && !runner.isInterrupted() && outcome == null) {
runner.interrupt();
}
}
}
}
这里我们添加了简单的状态跟踪,状态简单分为新建,完成,取消。
不过好像还缺点东西,状态还可以细化分为正常,业务异常,中断等等,不过再细化下去便是一个FutureTask了。
现在我们对FutureTask有了个整体直观的印象。FutureTask便是JDK提供的“Callable”的官方实现。支持状态查询,运行时操作,同步获取返回值等。和我们简易版的Callable相比,FutureTask 使用Unsafe实现粒度更细的锁,细化了状态与程序报错信息等。