Future
future谷歌的翻译是未来,为什么会用它命名呢?这里举个例子你在网上购物当你支付完成后,就是一个Future,因为你需要等待快递的到来而这一段时间你可以去做别的事情,当然这个Future你是可以取消的,也可以中断如果是比喻快递的话,你可以取消订单后拒收,这些操作都是针对任务的操作,但是任务是在异步执行中你不能直接进行管理所以就需要future来帮忙,比如取消订单就需要通过淘宝等购物网站而对于你来说淘宝就是future有他来控制任务的行动及处理。
再讲线程池的时候有说过关于他的介绍,忘记的朋友可以回顾一下,下面介绍netty中Future的结构与它的实现。
上方图有些模糊因为Future有点多所以有些模糊,这会和线程结构一样进行每个类的讲解,当然线程结构还未结束,当future讲解结束在进行,因为future也非常重要如果不将他线程池的实现无法进行,闲话少说,上问中介绍过jdk的Future和netty的Future他两者是继承关系,那么接下来讲解netty的Future的实现。
//该future并没有什么可介绍他仅仅是一个标记接口,并且继承了jdk的ScheduledFuture接口,此接口在上章中已经介绍了。
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> {
}
//管道任务,管道是netty的核心概念,他将每个请求都当做一个管道,用户可以使用管道进行通讯读取数据等操作,再讲完结构后会讲解管道的概念,这里暂时将ChannelFuture当做一个定义,这里可以看出他继承与Future但是返回的类型确实Void,Void类仅仅是一个站位标志类无任何作用,暂且当做当前的Future并没有结果可以返回,再讲他实现的时候将会细讲。
public interface ChannelFuture extends Future<Void> {
//返回当前的操作管道
Channel channel();
//下面@Override标记的方法都是对父类的重写替换,这里可能有人会有疑问为什么父类定义返回的是Future而这里则是ChannelFuture,这是因为java在1.5后支持方法重写返回类型可以是子类这话可能比较绕,那么就按现在的代码来看在Future中返回的是Future那么在重写的还是允许修改为Future的子类,下面方法的定义具体查看上篇文章。
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
//是否为无效的Future,如果是则不允许调用:addListener、addListeners、await、awaitUninterruptibly、sync、syncUninterruptibly这些方法。具体根据类的实现而定。
boolean isVoid();
}
//用于获取操作进度的Future,此接口并没有定义自己的方法仅仅是重写了父类方法与ChannelFuture相同
public interface ProgressiveFuture<V> extends Future<V> {
}
//官方解释说他是一个特殊的Future,因为从Future可以看出我们只能做的就是中断或者取消,而这个Promise是可以设置为成功和失败的,相比之下Promise是可写的,可以更好的控制Future。
public interface Promise<V> extends Future<V> {
//设置当前Promise执行成功并且通知监听的监听器们,如果失败则抛出异常
Promise<V> setSuccess(V result);
//尝试设置成功,如果设置成功那么与上方方法一样并且返回true,如果失败则返回false
boolean trySuccess(V result);
//与setSuccess相同只不过这里是设置失败。如果设置结果失败则抛出异常
Promise<V> setFailure(Throwable cause);
//与trySuccess相同只不过这里是设置失败,如果设置结果失败则返回false
boolean tryFailure(Throwable cause);
//设置当前操作不允许被取消
//返回true:标记为无法取消执行或者当前任务已经正常完成并且并没有被取消
//返回false:当前任务已经被取消了
boolean setUncancellable();
//剩下方法是对Future的重写参考ChannelFuture类。
}
//用于ChannelGroup的批量处理,具体根据实现而定下面将会介绍
//这里他实现了Iterable,Iterable接口在之前已经介绍过如果有遗忘的同学可以看看上篇文章的介绍。
public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture> {
//返回相关联的ChannelGroup,前面介绍有说过此类是ChannelGroup的批量处理,那么自然是有关联的这个方法就是返回相关的ChannelGroup。
ChannelGroup group();
//通过传入的管道获取对应处理的Future
ChannelFuture find(Channel channel);
//批量处理的结果,如果传入的处理都成功了那么则返回true,有一个任务失败都会返回false。
@Override
boolean isSuccess();
//获取操作中的异常,这个异常也是经过处理的,因为可能是多个任务处理失败。具体看他的实现。
@Override
ChannelGroupException cause();
//当处理的任务中部分是成功的则为true
boolean isPartialSuccess();
//当处理的任务中部分失败了则为true
boolean isPartialFailure();
//获取处理任务的迭代器,这样就可以遍历批量中的任务并进行操作
@Override
Iterator<ChannelFuture> iterator();
//剩下方法是对Future的重写参考ChannelFuture类。
}
结构图中的第二层接口定义基本结束,接下来讲解他们的实现。
//此类事结构图中唯一一个直接与Future打交道的类,而且还是个抽象类。
public abstract class AbstractFuture<V> implements Future<V> {
//对Future的实现获取执行结果
@Override
public V get() throws InterruptedException, ExecutionException {
//等待执行结果,此方法会进行阻塞,等待执行结束为止或者被中断,如果被中断则跑出异常。
await();
//当执行结束获取当前的执行过程中的异常,可以看出await等待如果执行内部出错是不会跑出异常的。
Throwable cause = cause();
//如果执行过程中没有异常则获取执行的结果getNow非阻塞方法。
if (cause == null) {
return getNow();
}
//否则则判断是否被取消,如果是被取消则后续可以做处理毕竟是人为的。
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
//如果不是被取消那么就代表是执行逻辑出错了那么则封装异常并且抛出异常。
throw new ExecutionException(cause);
}
//也是获取执行结果只不过设置了超时时间
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
//等待一定时间的执行如果超时则返回false否则是true
if (await(timeout, unit)) {
//能进来说明并未超时,则获取执行过程中的异常,一下的处理参考上方的get方法。
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
//到这一步则代表即使超时了也并没有完成任务则抛出TimeOut异常
throw new TimeoutException();
}
}
第一个类攻关结束了...是的这个类就是这么简单,但是发现我们只看到了两个方法的实现,而在这两个方法内有更多的方法调用,接下来进行讲述他的实现类。
//一个完成的Future,这个CompleteFuture是一个结果父级,此类大多方法都是拥有默认值的,具体的差异在子类重写。
public abstract class CompleteFuture<V> extends AbstractFuture<V> {
//定义一个事件的执行器,可能会有人疑惑为什么这里要执行器,上面讲述过Future是一个异步任务既然是异步自然就是多线程而Future并没有线程的管理都是对任务的管理所以需要单独的一个执行器来进行执行这个任务从而产生关联
private final EventExecutor executor;
//注意他的作用域子类可见,初始化数据则是上面的执行器
protected CompleteFuture(EventExecutor executor) {
this.executor = executor;
}
//获取当前事件执行的执行器。
protected EventExecutor executor() {
return executor;
}
//添加监听器
@Override
public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
//判断传入的监听器是否为空如果是则跑出异常
if (listener == null) {
throw new NullPointerException("listener");
}
//给指定监听器通知当前的Future 具体添加流程在将DefaultPromise时会详细介绍。
DefaultPromise.notifyListener(executor(), this, listener);
//返回当前的Future,因为这里并没有重写接口定义所以返回值还是Future接口
return this;
}
//批量添加监听器
@Override
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
//如果传入监听器数组为null则抛出异常
if (listeners == null) {
throw new NullPointerException("listeners");
}
//如果不为null则进行遍历
for (GenericFutureListener<? extends Future<? super V>> l: listeners) {
//当前数组中有那个监听器是null的时候则跳出循环,笔者认为这里使用continue会更好一点,当前不要传入null是最好的。
if (l == null) {
break;
}
//给指定监听器通知当前的Future
DefaultPromise.notifyListener(executor(), this, l);
}
//返回当前
return this;
}
//这里并没有实现删除监听器,或者说这个抽象类不允许删除监听器
@Override
public Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
// NOOP
return this;
}
//同上方法
@Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// NOOP
return this;
}
//下面的方法实现都很简单这里就不做解释了,只要知道每个方法的定义自然知道为什么这么返回的,不清楚的请看前文定义。
@Override
public Future<V> await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future<V> sync() throws InterruptedException {
return this;
}
@Override
public Future<V> syncUninterruptibly() {
return this;
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return true;
}
@Override
public Future<V> awaitUninterruptibly() {
return this;
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
return true;
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
}
//完成成功结果类,继承与CompleteFuture,此类的所有方法都是符合完成标记的,比如:isDone为true、isCancellable为false、isCancelled为false等父级的方法的固定返回值。
public final class SucceededFuture<V> extends CompleteFuture<V> {
//定义了一个结果对象,用来存储任务的执行返回值
private final V result;
//传入了执行器,父级使用的,传入结果
public SucceededFuture(EventExecutor executor, V result) {
super(executor);
this.result = result;
}
//在父类中并没有去实现这个方法,而AbstractFuture实现get的时候就有此方法的调用所以此类非常有必要实现
@Override
public Throwable cause() {
return null;
}
//设置是执行成功的
@Override
public boolean isSuccess() {
return true;
}
//返回创建时的结果
@Override
public V getNow() {
return result;
}
}
//失败结果的定义与SucceededFuture方法定义相反
public final class FailedFuture<V> extends CompleteFuture<V> {
//失败了必然是有异常的,此类为定义的异常类
private final Throwable cause;
//传入执行器与失败的异常,并且判断了如果异常时null则抛出NullPointer异常
public FailedFuture(EventExecutor executor, Throwable cause) {
super(executor);
if (cause == null) {
throw new NullPointerException("cause");
}
this.cause = cause;
}
//此方法在AbstractFuture实现get的时候就有此方法的调用
@Override
public Throwable cause() {
return cause;
}
//都是失败了自然是false
@Override
public boolean isSuccess() {
return false;
}
//同步获取结果则抛出创建时传入的异常
@Override
public Future<V> sync() {
PlatformDependent.throwException(cause);
return this;
}
//同步获取结果则抛出创建时传入的异常与上方方法相同,两个方法获取的差异看前方的定义。
@Override
public Future<V> syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}
//失败自然没有结果则返回null
@Override
public V getNow() {
return null;
}
}
//与CompleteFuture类一样也是定义结果的只不过这里是针对管道操作进行的定义而CompleteFuture是针对所有任务定义的,此类是在CompleteFuture的基础上提出关于管道的任务做了更细致的定义。并且这个类实现了前面讲述的ChannelFuture接口
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture {
//既然是关于管道的自然需要相关联
private final Channel channel;
//既然是继承于CompleteFuture自然需要一个执行器,判断传入的管道不能为null
protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
super(executor);
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
}
//获取执行器
@Override
protected EventExecutor executor() {
//获取执行器,我们看了这么多的CompleteFuture实现发现这个执行器并不是非常重要,都没有进行非null校验但是如果真正需要使用的时候那么就必须不是null,所以这里做了处理当传入的执行器是null的时候回使用管道中的执行器,从这里可以看出管道中也是有执行器,再讲管道的时候将会详细介绍
EventExecutor e = super.executor();
if (e == null) {
//如果创建结果时并没有传入执行器则返回管道的执行器
return channel().eventLoop();
} else {
return e;
}
}
//此方法参照CompleteFuture
@Override
public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.addListener(listener);
return this;
}
//此方法参照CompleteFuture
@Override
public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.addListeners(listeners);
return this;
}
//此方法参照CompleteFuture
@Override
public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
super.removeListener(listener);
return this;
}
//此方法参照CompleteFuture
@Override
public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
super.removeListeners(listeners);
return this;
}
//下面方法实现与ChannelFuture所以返回的类型修改为ChannelFuture,并没有特殊的处理所以知道即可,毕竟此类事完成结果类这些方法并没有什么用。
@Override
public ChannelFuture syncUninterruptibly() {
return this;
}
@Override
public ChannelFuture sync() throws InterruptedException {
return this;
}
@Override
public ChannelFuture await() throws InterruptedException {
return this;
}
@Override
public ChannelFuture awaitUninterruptibly() {
return this;
}
//返回传入的管道
@Override
public Channel channel() {
return channel;
}
@Override
public Void getNow() {
return null;
}
//此对象并不是无效对象。具体查看ChannelFuture的接口定义
@Override
public boolean isVoid() {
return false;
}
}
//继承于CompleteChannelFuture抽象类,此类并没有什么可说的,具体含义参照SucceededFuture接口的讲解
final class SucceededChannelFuture extends CompleteChannelFuture {
SucceededChannelFuture(Channel channel, EventExecutor executor) {
super(channel, executor);
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
}
//继承于CompleteChannelFuture抽象类,此类并没有什么可说的,具体含义参照FailedFuture接口的讲解
final class FailedChannelFuture extends CompleteChannelFuture {
private final Throwable cause;
FailedChannelFuture(Channel channel, EventExecutor executor, Throwable cause) {
super(channel, executor);
if (cause == null) {
throw new NullPointerException("cause");
}
this.cause = cause;
}
@Override
public Throwable cause() {
return cause;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public ChannelFuture sync() {
PlatformDependent.throwException(cause);
return this;
}
@Override
public ChannelFuture syncUninterruptibly() {
PlatformDependent.throwException(cause);
return this;
}
}
到此关于CompleteFuture的实现介绍完了,总结:此类并无实际意义仅仅是对返回Future的约束,适用于已知结果的Future的封装,因为里面并没有真正的处理逻辑。
接下来讲解AbstractFuture另一个实现VoidChannelPromise,但是在讲他之前需要再讲一个定义。
//此接口继承了两个接口ChannelFuture与Promise具体讲解请看上文。
//前面介绍Promise的时候说过他是一个特殊的Future他可以手动设置成功,设置成功需要一个结果值而这里的定义则是结果值是一个Void无效的类型可以看出只要是实现当前的Future则都没有返回值即使有返回值也是Void。
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
//这里再一次重新定义了ChannelFuture的channel方法,这个重写毫无意义。。估计是为了以后修改,修改方式则是将Channel改成他的子类实现,暂时不管看当前接口到时候的实现即可
@Override
Channel channel();
//这里仅仅是对父类的一个重写,此重写是为了返回this为ChannelPromise的对象这样方便调用ChannelPromise中声明的方法
@Override
ChannelPromise setSuccess(Void result);
//既然是无效值说明不设置也可以所以这里讲上方的传参去掉了
ChannelPromise setSuccess();
//尝试成功也是一样,去除了无效的结果值
boolean trySuccess();
//一下方法参考setSuccess(Void result);的解释
@Override
ChannelPromise setFailure(Throwable cause);
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
//如果isVoid是true则返回全新的ChannelPromise,否则返回this
ChannelPromise unvoid();
}
//此类是无效的类,用到此类的地方都是用作站位,具体的在使用的地方详细介绍。他的存在是为了减少侦听器通知和维护侦听器集合的开销,但是作者表明希望删除它估计以后的版本这个类会消失,github issus 8518可查看作者的解释
public final class VoidChannelPromise extends AbstractFuture<Void> implements ChannelPromise {
//既然实现了ChannelPromise此属性是必备的,ChannelPromise中有个channel方法用来返回
private final Channel channel;
//是否你传播异常,如果此属性不是null则进行传递,因为handle是多个顺序执行的第一个handle出错传给下一个handle然后继续传播
private final ChannelFutureListener fireExceptionListener;
//传入一个管道并且传入是否需要传播异常
public VoidChannelPromise(final Channel channel, boolean fireException) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
//当传播异常为true的时候将会创建一个监听器的匿名类,当完成任务后判断是否发生了异常如果发生则开始传播。
if (fireException) {
//监听器后续会介绍
fireExceptionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//此方法当任务完成后进入,先获取当前的任务执行过程是否有异常。
Throwable cause = future.cause();
if (cause != null) {
//如果有异常则进行传播
fireException0(cause);
}
}
};
} else {
//如果设置不传播则赋值为null
fireExceptionListener = null;
}
}
//之前说过此类仅仅是一个标识类所以并没有监听器可维护所以内部调用了一个抛出异常的方法。
@Override
public VoidChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
fail();
return this;
}
//同上
@Override
public VoidChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
fail();
return this;
}
//删除直接就没有进行开发可跳过
@Override
public VoidChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
// NOOP
return this;
}
//同上
@Override
public VoidChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
// NOOP
return this;
}
//此方法还好,好在还判断了是否中断如果中断则抛出异常,否则返回当前类
@Override
public VoidChannelPromise await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return this;
}
//同addListener方法
@Override
public boolean await(long timeout, TimeUnit unit) {
fail();
return false;
}
//同addListener方法
@Override
public boolean await(long timeoutMillis) {
fail();
return false;
}
//同addListener方法
@Override
public VoidChannelPromise awaitUninterruptibly() {
fail();
return this;
}
//同addListener方法
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
fail();
return false;
}
//同addListener方法
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
fail();
return false;
}
//返回创建时传入的Channel
@Override
public Channel channel() {
return channel;
}
//是否完成:false 使用标识的时候需要用到false所以默认是false
@Override
public boolean isDone() {
return false;
}
//同上
@Override
public boolean isSuccess() {
return false;
}
//同上
@Override
public boolean setUncancellable() {
return true;
}
//同上
@Override
public boolean isCancellable() {
return false;
}
//同上
@Override
public boolean isCancelled() {
return false;
}
//同上
@Override
public Throwable cause() {
return null;
}
//同addListener方法
@Override
public VoidChannelPromise sync() {
fail();
return this;
}
//同addListener方法
@Override
public VoidChannelPromise syncUninterruptibly() {
fail();
return this;
}
//如果设置异常则进行传播
@Override
public VoidChannelPromise setFailure(Throwable cause) {
fireException0(cause);
return this;
}
@Override
public VoidChannelPromise setSuccess() {
return this;
}
//如果尝试设置异常则进行传播
@Override
public boolean tryFailure(Throwable cause) {
fireException0(cause);
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean trySuccess() {
return false;
}
//之前一直调用fail方法,此方法仅仅是抛出了异常,因为是标识类所以是无效的future从msg信息也能得出结论
private static void fail() {
throw new IllegalStateException("void future");
}
@Override
public VoidChannelPromise setSuccess(Void result) {
return this;
}
@Override
public boolean trySuccess(Void result) {
return false;
}
@Override
public Void getNow() {
return null;
}
//前面介绍定义的时候就说过当isVoid为true的时候创建一个新的Promise传出否则返回this,可以结合下面的isVoid看出与定义是一样的,这里DefaultChannelPromise类在后面会介绍。
@Override
public ChannelPromise unvoid() {
ChannelPromise promise = new DefaultChannelPromise(channel);
if (fireExceptionListener != null) {
promise.addListener(fireExceptionListener);
}
return promise;
}
@Override
public boolean isVoid() {
return true;
}
//传播异常,之前说了是管道的handle所以这里调用的管道的pipeline方法进行设置此方法在将管道实现的时候将会详细介绍
private void fireException0(Throwable cause) {
//如果监听器不是null 并且 当前管道是注册的状态则进行传播
if (fireExceptionListener != null && channel.isRegistered()) {
channel.pipeline().fireExceptionCaught(cause);
}
}
}
future第一篇到此结束接下来讲述DefaultPromise由于较大所以进行分章讲解。