PromiseTask源码讲解
在讲解PromiseTask之前需要先介绍几个之前没有讲述的接口定义,虽然PromiseTask继承与DefaultPromise但是他们之间还是有差距的,之前一直再说future的定义是一个任务管理器,那么DefaultPromise则就是实现管理器的公用方法,仅仅是对任务执行的描述,但是并没有真正的操作任务,而PromiseTask则对任务做了操作的处理,所以他有一些独特的接口需要实现,那么下面将是对这些接口的定义。
//jdk定义的接口此接口就是为了将Runnable 和 future 整合到一起,这样对此接口的实现都是可以进行独立运行的future。
//感兴趣的读者可以去查找下在netty中是否使用DefaultPromise做了单独的执行任务。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
//jdk定义接口,线程执行时必须传入的接口定义,从而可以看出PromiseTask在内部实现了run方法从而达到多线程运行的效果。
public interface Runnable {
public abstract void run();
}
//这里继承的Future接口是jdk的如果有疑问请查看第一章线程结构中有讲解此接口的定义
//PromiseTask继承与DefaultPromise并且实现了RunnableFuture接口
class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
//之前在讲解线程结构的时候有讲解过Callable的定义,如果有遗忘的读者请回看回忆一下。
//之前一直在说Future是任务管理,其中管理中有结果result,但是Runnable接口的run方法并没有返回值,所以这里的做法是将Runnable接口转换成Callable接口
//转换很简单就是创建了一个适配器并且传入结果和runnable接口
static <T> Callable<T> toCallable(Runnable runnable, T result) {
return new RunnableAdapter<T>(runnable, result);
}
//定义的内部适配器类,此类继承与Callable
private static final class RunnableAdapter<T> implements Callable<T> {
//定义Runnable接口的属性
final Runnable task;
//和结果集属性
final T result;
//构造器中传入Runnable和result
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
//当运行Callable的时候则会调用此方法,此方法的实现也很简单,调用Runnable的run方法并且返回创建适配器时传入的result
@Override
public T call() {
task.run();
return result;
}
//重写toString方法并且返回一些详细信息
@Override
public String toString() {
return "Callable(task: " + task + ", result: " + result + ')';
}
}
//定义了Callable用于存储线程执行时需要执行的任务
protected final Callable<V> task;
//构造器,传入一个线程执行器和不要执行的Runnable,最后还传入了返回结果。
//之前说定义说过个Runnable是没有返回值的所以需要在定义的时候就设置好结果,所以如果使用Runnable就必须要传入结果
PromiseTask(EventExecutor executor, Runnable runnable, V result) {
this(executor, toCallable(runnable, result));
}
//构造器,传入执行器和callable需要执行的有结果任务。因为传入的就是个执行有返回值的任务所以不用再传入结果
//而上方的构造器就是讲Runnable封装了下传入此构造器
PromiseTask(EventExecutor executor, Callable<V> callable) {
super(executor);
task = callable;
}
//hashCode 对象标识
@Override
public final int hashCode() {
return System.identityHashCode(this);
}
//重写了equals,任务直接的比较
@Override
public final boolean equals(Object obj) {
return this == obj;
}
//之前提到的run方法的实现。
//此处可能稍微有点抽象,因为并没有看到线程并且传入的执行器也没有执行,那是因为此方法在调用的时候已经有单独的一个线程在调用了,所以看起来和正常方法没有什么不一样的。
@Override
public void run() {
try {
//首先判断当前任务状态是否为不可取消状态,因为如果设置到这个状态则表示当前的任务正在运行.
if (setUncancellableInternal()) {
//如果没有运行那么则调用task.call方法获取执行结果
V result = task.call();
//执行完成则设置结果
setSuccessInternal(result);
}
} catch (Throwable e) {
//如果报错则设置异常结果
setFailureInternal(e);
}
}
//剩下的方法都是默认的一些处理因为他下面还会有继承所以大多方法并没有特殊含义。
//下面的方法再前面定义的时候都已经讲过所以这再不会进行讲解,因为并没有特殊的实现,要么是调用父级要么是直接返回值数据。
@Override
public final Promise<V> setFailure(Throwable cause) {
throw new IllegalStateException();
}
protected final Promise<V> setFailureInternal(Throwable cause) {
super.setFailure(cause);
return this;
}
@Override
public final boolean tryFailure(Throwable cause) {
return false;
}
protected final boolean tryFailureInternal(Throwable cause) {
return super.tryFailure(cause);
}
@Override
public final Promise<V> setSuccess(V result) {
throw new IllegalStateException();
}
protected final Promise<V> setSuccessInternal(V result) {
super.setSuccess(result);
return this;
}
@Override
public final boolean trySuccess(V result) {
return false;
}
protected final boolean trySuccessInternal(V result) {
return super.trySuccess(result);
}
@Override
public final boolean setUncancellable() {
throw new IllegalStateException();
}
protected final boolean setUncancellableInternal() {
return super.setUncancellable();
}
//此处并没有重写父级的toString方法而是重写了父类的toStringBuilder方法用于消息的处理。
@Override
protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ',');
return buf.append(" task: ")
.append(task)
.append(')');
}
}
到此PromiseTask到此结束了,接下来讲解ScheduledFutureTask的实现,但是在讲解他之前需要再说几个定义和其内部的实现,因为内部依赖否则会讲的非常模糊。
由于ScheduledFuture已经在第二篇中讲解过所以这里不再讲解
//此接口是为了维护队列内部数据使用的,只有在DefaultPriorityQueue中使用到了,其他地方不应该调用此接口定义的方法
//这里的维护仅仅是记录ScheduledFutureTask具体任务在队列中的下标地址,而下面的两个方法都是对下标做的操作。
//这里比较抽象,读者可以这样想,任务并不是一个是多个但是他们都需要执行,只能一个一个去执行,这样就需要一个队列去进行排序排到第几个则为了后面能快速获取到当前任务的下标所以这里实现了这个接口这算是个优化点,因为如果不进行记录那么可能会导致,如果获取当前下标则需要进行遍历比较,这样是非常消耗性能的,所以这个做法挺不错,可以拿来借鉴。
public interface PriorityQueueNode {
//定义了一个常量值,不存在队列中的index
int INDEX_NOT_IN_QUEUE = -1;
//获取在传入队列中的下标地址,具体看实现。
int priorityQueueIndex(DefaultPriorityQueue<?> queue);
//设置当前的任务在队列中的下标位置,int i则是对于的index
void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i);
}
在讲解前说过,会有几个定义,但是在笔者阅读总结的时候发现,剩下的定义是讲解过的所以这里只讲述为讲过的定义。接下来就是对ScheduledFutureTask的实现讲解。
//延迟任务管理,不仅支持延迟执行也可以根据周期一直运行。
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
//下一次任务的执行id,
private static final AtomicLong nextTaskId = new AtomicLong();
//任务的创建时间,创建了当前任务则代表已经开始,因为如果是延迟任务那么就要从创建开始进行计时。
private static final long START_TIME = System.nanoTime();
//获取当前时间减去创建时间的时间差
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
//获取最后一次的执行时间,此方法一般用于循环任务
static long deadlineNanos(long delay) {
//使用当前时间减去任务开始时间并且加上周期不管怎么算都会是下一次的执行时间的间隔
//这里稍微有点绕,此处并不是使用具体的时间进行比较的而是使用时间段进行比较的,比如开始时间是00:00:00而当前时间是00:00:01他们的时间段就是1s而下一次执行周期计算应该是2s如果这样比较那么此条件不成立则不执行,直到当前时间00:00:02的时候才进行执行。而此方法就是获取下一次执行周期的计算结果。
long deadlineNanos = nanoTime() + delay;
//这里防止计算错误导致程序错误所以做了对应的处理
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
//获取当前的id,CAS之前说到过,这里是将nextTaskId字段进行加一进行返回
private final long id = nextTaskId.getAndIncrement();
//记录任务执行的周期叠加数,之前介绍了他的计算是计算的时间段而每个时间段执行都需要叠加上周期这样才能保证执行时间的准确
//这里提一下可能有人会发现START_TIME是static描述的不管是那个对象来使用都会是一样开始时间,而为了保证执行的准确性再添加任务的时候回将已过去的周期叠加到此字段,就是调用了deadlineNanos方法,这里提到可能会有些抽象,后面使用的时候自然就会清楚。
private long deadlineNanos;
//周期时长,这里需要注意这个周期有三个状态
//等于0的时候不会循环执行
//小于0则使用scheduleWithFixedDelay方法的算法,下一次执行时间是上次执行结束的时间加周期
//大于0则使用scheduleAtFixedRate方法的算法,下一次执行时间是上一次执行时间加周期
//大于小于的两者差距在前面详细的介绍过遗忘的读者可以再去阅读。
private final long periodNanos;
//刚才讲述了PriorityQueueNode有说存储当前node是在队列中的那个下标,而此变量则是对列存储的下标
private int queueIndex = INDEX_NOT_IN_QUEUE;
//构造器,传入执行器、运行的Runnable,因为是Runnable所以传入了result,执行的时间。
//可以看出此方法是延迟执行任务的构造,因为没有传入周期,执行一次即可结束。
//此处的执行时间是执行开始时间,而这个时间的算法就是deadlineNanos方法的调用
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Runnable runnable, V result, long nanoTime) {
this(executor, toCallable(runnable, result), nanoTime);
}
//构造器,传入执行器,运行的Callable,执行时间,周期
//此处只支持period大于0或者小于0,如果等于0则会抛出异常
//而period就是对periodNanos的赋值之前讲述过他的差异
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) {
super(executor, callable);
if (period == 0) {
throw new IllegalArgumentException("period: 0 (expected: != 0)");
}
deadlineNanos = nanoTime;
periodNanos = period;
}
//此处是对第一个构造的一个调用实现,因为第一个构造传入的是Runnable而ScheduledFutureTask使用的任务是Callable所以第一个构造调用了一个转换的方法然后调用此构造
//可以看出默认周期是0则代表此构造是不重复运行的
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) {
super(executor, callable);
deadlineNanos = nanoTime;
periodNanos = 0;
}
//获取执行器
@Override
protected EventExecutor executor() {
return super.executor();
}
//获取执行时间
public long deadlineNanos() {
return deadlineNanos;
}
//获取当前时间还有多久到下一次执行时间
//获取获取时间和下一次执行时间的差,如果当前时间已经超过下一次执行时间则返回0
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
}
//上一个方法使用的是当前时间而此方法使用的是传入的指定时间
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
//将获取到的时长转为指定的时间类型,获取到的试纳秒如果传入的unit是秒或者毫秒则会转成对象的时长返回
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
}
//之前再说Delayed接口的时候,此结构基础过Comparable接口所以整个方法实现的Comparable接口的方法
//此方法是比较两个ScheduledFutureTask的周期任务是下次执行的时长,因为既然是在队列中那么每次弹出的任务都会是头部的,所以是为了将先执行的任务排到队列头使用。
//此函数具体的返回值需要根据使用出做出判定此处不做解释
@Override
public int compareTo(Delayed o) {
//如果两个对象比较相等则返回0
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
//当前的执行时间减去传入的执行时间,获取的就是他们的差数
long d = deadlineNanos() - that.deadlineNanos();
//如果小于0 则代表当前的时间执行早于传入的时间则返回-1
if (d < 0) {
return -1;
//如果大于0则代表当前任务晚于传入的时间则返回1
} else if (d > 0) {
return 1;
//如果他俩下一个周期时间相等则代表d是0,则判断他当前的id是否小于传入的id,如果小则代表当前任务优先于传入的任务则返回-1
} else if (id < that.id) {
return -1;
//如果两个id相等则抛出异常
} else if (id == that.id) {
throw new Error();
//否则传入的任务优先于当前的任务,此处结论是根据调用出总结。
} else {
return 1;
}
}
//最终的运行run方法
@Override
public void run() {
//如果当前线程不是传入的执行器线程则会抛出断言异常当然如果运行时没有开启断言关键字那么次代码无效
assert executor().inEventLoop();
try {
//检查是否周期为0之前说过如果是0则不进行循环
if (periodNanos == 0) {
//与父级的使用相同设置为状态为正在运运行
if (setUncancellableInternal()) {
//执行任务
V result = task.call();
//设置为成功
setSuccessInternal(result);
}
} else {
//检查当前的任务是否被取消了
if (!isCancelled()) {
//如果没有则调用call,因为能进入这里都是循环执行的任务所以没有返回值
task.call();
//并且判断当前的执行器是否已经关闭
if (!executor().isShutdown()) {
//将当前的周期时间赋值给p
long p = periodNanos;
//如果当前周期大于0则代表当前时间添加周期时间
//这里需要注意当前时间包括了不包括执行时间
//这样说可能有点绕,这样理解这里的p是本次执行是在开始的准时间,什么是准时间?就是无视任务的执行时间以周期时间和执行开始时间计算。
//scheduleAtFixedRate方法的算法,通过下面的deadlineNanos+=p也是可以看出的。
if (p > 0) {
deadlineNanos += p;
} else {
//此处小于0 则就需要将当前程序的运行时间也要算进去所以使用了当前时间加周期,p因为小于0所以负负得正了
deadlineNanos = nanoTime() - p;
}
//如果还没有取消当前任务
if (!isCancelled()) {
//获取任务队列并且将当前的任务在丢进去,因为已经计算完下一次执行的时间了所以当前任务已经是一个新的任务,最起码执行时间改变了
Queue<ScheduledFutureTask<?>> scheduledTaskQueue =
((AbstractScheduledEventExecutor) executor()).scheduledTaskQueue;
assert scheduledTaskQueue != null;
scheduledTaskQueue.add(this);
}
}
}
}
} catch (Throwable cause) {
//如果出现异常则设置为失败
setFailureInternal(cause);
}
}
//取消当前任务所以需要从任务队列中移除当前任务
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean canceled = super.cancel(mayInterruptIfRunning);
if (canceled) {
((AbstractScheduledEventExecutor) executor()).removeScheduled(this);
}
return canceled;
}
//取消不删除则直接调用父级方法不做任务的删除,
boolean cancelWithoutRemove(boolean mayInterruptIfRunning) {
return super.cancel(mayInterruptIfRunning);
}
//和之前已经重写了父类的toString打印的详细信息
@Override
protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ',');
return buf.append(" id: ")
.append(id)
.append(", deadline: ")
.append(deadlineNanos)
.append(", period: ")
.append(periodNanos)
.append(')');
}
//获取在队列中的位置
@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
return queueIndex;
}
//设置当前任务在队列中的位置
@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
queueIndex = i;
}
}
到此PromiseTask的分支基本讲解完毕了还有一个RunnableScheduledFutureTask它的定义是一个内部类到时候再讲对于执行器的时候会将此类一起讲解。