从Runnable到Callable,JDK虽然帮我们封装了异步结果的获取,但并没有为我们封装异步任务两种结果(正常执行或异常)的处理。
虽然我们可以调用Future的get自行获取异步结果,并根据结果(成功或异常)做对应的处理逻辑。但比较呆的点在于Future的get方法在异步任务未完成前会阻塞调用线程。
所以我们一般不这样操作。
我们希望执行异步任务的线程,在异步任务完成后,能自动调用异步结果的处理逻辑,不需要我们另起线程等待异步任务执行完成,这便是常说的异步回调。
虽然JDK没帮我们封装,但却贴心的为我们预留了异步回调的扩展点。这个点就是FutureTask类的done方法。
/**
* Protected method invoked when this task transitions to state
* {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }
这个done方法会在异步任务执行完后调用,默认是一个空实现,我们可以在子类重写这个方法,并放上异步回调的逻辑。
根据Callable任务的执行原理和JDK为我们预留的扩展点,我们可以方便的实现异步回调。
关于Callable任务的执行原理可以参考:Java Callable任务
异步回调的实现
继承AbstractExecutorService,重写newTaskFor方法:
import java.util.List;
import java.util.concurrent.*;
public class MyExecutorService extends AbstractExecutorService {
// 执行异步任务的线程池
private final ExecutorService taskExecutor;
// 回调逻辑
private final Callback callback;
// 执行回调逻辑的线程池
private final ExecutorService callbackExecutor;
public MyExecutorService(ExecutorService taskExecutor, Callback callback, ExecutorService callbackExecutor) {
this.taskExecutor = taskExecutor;
this.callback = callback;
this.callbackExecutor = callbackExecutor;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<>(callable) {
// 继承FutureTask,并重写done方法
@Override
protected void done() {
T result;
try {
// 执行到这的时候,异步任务已经执行完了
// 这个get不会阻塞,仅用来获取异步结果
result = get();
} catch (Exception e) {
callbackExecutor.execute(() -> {
// 调用失败处理逻辑
callback.onFailure(e);
});
return;
}
callbackExecutor.execute(() -> {
// 调用成功处理逻辑
callback.onSuccess(result);
});
}
};
}
@Override
public void shutdown() {
taskExecutor.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return taskExecutor.shutdownNow();
}
@Override
public boolean isShutdown() {
return taskExecutor.isShutdown();
}
@Override
public boolean isTerminated() {
return taskExecutor.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return taskExecutor.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
taskExecutor.execute(command);
}
}
执行到done方法里的get调用时,不会阻塞,因为异步任务的结果已经设置了,可以直接get到执行结果。
定义回调接口:
public interface Callback {
<T> void onSuccess(T t);
void onFailure(Exception e);
}
是不是挺简单的!
使用示例
模拟异步任务成功执行:
import java.util.concurrent.*;
public class JdkCallbackMechanism {
public static void main(String[] args) throws InterruptedException {
// 定义回调处理逻辑
Callback callback = new Callback() {
@Override
public <T> void onSuccess(T t) {
System.out.println("任务成功执行了,结果是:" + t);
}
@Override
public void onFailure(Exception e) {
System.out.println("任务执行失败了,发生了异常:" + e);
}
};
// 定义处理异步任务和回调任务的线程池
ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
// 初始化MyExecutorService
MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
// 定义异步任务
Callable<Integer> callable = () -> {
System.out.println("异步任务正在执行...");
// 模拟耗时操作
TimeUnit.SECONDS.sleep(15);
return 1;
};
// 将异步任务提交给myExecutorService
myExecutorService.submit(callable);
}
}
执行结果:
模拟异步任务执行失败(用除0异常模拟):
import java.util.concurrent.*;
public class JdkCallbackMechanism {
public static void main(String[] args) throws InterruptedException {
// 定义回调处理逻辑
Callback callback = new Callback() {
@Override
public <T> void onSuccess(T t) {
System.out.println("任务成功执行了,结果是:" + t);
}
@Override
public void onFailure(Exception e) {
System.out.println("任务执行失败了,发生了异常:" + e);
}
};
// 定义处理异步任务和回调任务的线程池
ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
// 初始化MyExecutorService
MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
// 定义异步任务
Callable<Integer> callable = () -> {
System.out.println("异步任务正在执行...");
// 模拟耗时操作
TimeUnit.SECONDS.sleep(15);
return 1/0;
};
// 将异步任务提交给myExecutorService
myExecutorService.submit(callable);
}
}
执行结果:
模拟取消异步任务:
import java.util.concurrent.*;
public class JdkCallbackMechanism {
public static void main(String[] args) throws InterruptedException {
// 定义回调处理逻辑
Callback callback = new Callback() {
@Override
public <T> void onSuccess(T t) {
System.out.println("任务成功执行了,结果是:" + t);
}
@Override
public void onFailure(Exception e) {
System.out.println("任务执行失败了,发生了异常:" + e);
}
};
// 定义处理异步任务和回调任务的线程池
ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
// 初始化MyExecutorService
MyExecutorService myExecutorService = new MyExecutorService(taskExecutor, callback, callbackExecutor);
// 定义异步任务
Callable<Integer> callable = () -> {
System.out.println("异步任务正在执行...");
// 模拟耗时操作
TimeUnit.SECONDS.sleep(15);
return 1/0;
};
// 将异步任务提交给myExecutorService
Future<Integer> future = myExecutorService.submit(callable);
TimeUnit.SECONDS.sleep(5);
future.cancel(true);
}
}
执行结果:
最后:工作中,其实已经有很多已经封装好的异步回调框架,不需要我们自己造轮子,比如google的guava。但本文几乎以最精简的代码实现了一个异步回调,有助于大家理解异步回调的原理。