《线程池系列七》-Guava ListenableFuture AbstractFuture实现原理讲解

该篇文章与线程池的关系不是很大,由于它与FutureTask的实现非常的相似,因此放在了线程池系列。在学习ListenableFuture时一定要对比着FutureTask的相关知识点学习,了解两者的共同点、区别、适应场景。本节将详细讲解ListenableFure的默认实现AbstractFuture类,并结合FutureTask展开一些列的问题讨论。有关FutureTask的知识,请阅读我的另一篇文章《线程池系列一》-FutureTask原理讲解与源码剖析。本节标题与《线程池系列六》-Guava ListenableFutureTask非常相似,但是这两篇文章完全不同,ListenableFutureTask是基于FutureTask的实现,而本文主要讲解的是ListenableFuture的默认实现AbstractFuture类的使用

ListenableFuture<V>

ListenableFuture<V>接口继承Future<V>接口。Future接口是JDK定义的接口,其定义了与任务操作相关的方法,例如任务的取消:cancel(),判断任务的状态:isCancelled()、isDone(),获取任务结果:get()等。
ListenableFuture<V>是Guava(google提供的一个java开发包)定义的接口,其在Future接口的基础上添加了addListener()方法,源码如下:

void addListener(Runnable listener, Executor executor);

该方法主要用于给Future任务添加监听任务,需要主要的是:

  • listener任务必须指定执行该任务的线程池executor(该executor一定不要与执行Future任务的线程池是同一个,否则会出现死锁情况)
  • 监听任务执行的时机为Future任务执行完成(包括正确执行完成和任务抛异常终止)或者被取消。

AbstractFuture<V>内部类Sync

AbstractFuture的实现与FutureTask的实现非常相似,FutureTask使用unsafe包的CAS实现,而AbstractFuture使用的是AQS(AbstractQueuedSynchronizer,jdk锁实现的模板类,本文不做讲解),AbstractFuture将99%的操作全部都交于内部类Sync实现,下面讲解一下Sync类的实现

  • 任务的状态信息

Sync将任务状态分为5种,分别为running、completing、completed、cancelled、interrupted。其与FutureTask任务的状态对比如下图所示:

FutureTask-AbstractFuture状态对比图.png

从图中可以看出,AbstractFure相比与FutureTask少了两种状态Exceptional和interrupting状态,其中Exceptional状态,在AbstractFure中通过添加一个Throwable类型的结果来实现(如果Throwable对象的值不为null,则说明是exceptional),而interrupting状态再FutureTask中的作用就不是很大,在AstractFurue中并没有设计该状态。

  • 成员变量

与FutureTask不同,FutureTask中只有一个Object对象用来存放Future执行的结果,可以是正常结果,也可以是异常。在AbstractFuture中,将两种结果分开,正常结果放在value中,异常结果放在exception中,源码如下:

private V value;
private Throwable exception;

这也是为什么不设置exceptional状态就能区分正常和异常结果的原因。

  • 锁方法的重写

该锁使用的是共享锁来实现,主要涉及两个方法:

  1. tryAcquireShared(int ignored) 该方法是在获取锁时调用,多个线程可以同时获取锁
  2. tryReleaseShared(int finalState) 该方法是在释放锁时调用

在锁的时候时,我们一般都是先尝试获取锁,然后处理临界资源,处理完成后释放锁。而在AbstractFuture中并不是这种常规的使用方式,其实现是必须有一个线程先调用releaseShared(int arg)释放锁,其他线程才能调用acquireShared(int arg)获取锁,否则,所有的获取锁线程都将会堵塞
源码如下:

protected int tryAcquireShared(int ignored) {
    if (isDone()) {
        return 1;
    }
    return -1;
}
    
@Override
protected boolean tryReleaseShared(int finalState) {
    setState(finalState);
    return true;
}

从源码中可以看出,调用tryAcquireShared()方法就是判断任务有没有完成(isDone()方法),任务完成成功获取锁,任务没有完成则等待。
那么问题就是任务什么时候完成,任务完成都会调用tryReleaseShared()方法,该方法用于任务完成或者取消时设置最终状态。

  • 成员方法

一. 阻塞与非阻塞的get()方法
get()方法或间接调用获取锁操作,如果成功获取锁,则返回对应的结果,如果获取不到锁,则返回对应的异常

V get(long nanos) throws TimeoutException, CancellationException,
        ExecutionException, InterruptedException {

    //间接调用tryAcquireShared(arg)方法,支持中断,最长等待nanos时间
    if (!tryAcquireSharedNanos(-1, nanos)) {
        throw new TimeoutException("Timeout waiting for task.");
    }

    return getValue();
}
    
V get() throws CancellationException, ExecutionException,
        InterruptedException {

    //间接调用tryAcquireShared(arg)方法,支持中断,直到获取锁返回
    acquireSharedInterruptibly(-1);
    return getValue();
}

从源码中可以看出,两种get()方法都会间接的调用tryAcquireShared(arg)方法,且都支持中断。在成功获取锁之后,两者都会调用getValue()方法,其源码如下:

private V getValue() throws CancellationException, ExecutionException {
    int state = getState();
    switch (state) {
        case COMPLETED:
            if (exception != null) {
                throw new ExecutionException(exception);
            } else {
                return value;
            }

        case CANCELLED:
        case INTERRUPTED:
            throw cancellationExceptionWithCause(
                    "Task was cancelled.", exception);

        default:
            throw new IllegalStateException(
                    "Error, synchronizer in invalid state: " + state);
    }
}

该方法主要针对三种终止状态做处理:

  1. completed状态,该状态又分为正常结束和异常终止,通过判断exception是否为null进行区分,如果异常,则抛出异常,否则返回正常结束的结果
  2. cancelled和interrupted状态,统一处理抛出取消异常
  3. 其他情况,讲道理不会出现的状态,如果出现了抛出非法状态异常

二. 设值方法
该类方法都会间接调用 tryReleaseShared(int finalState)方法,使锁处于可获取状态,表示任务执行完成。其中包括set(@Nullable V v)设值正常值、setException(Throwable t)设值异常、cancel(boolean interrupt)取消任务,期源码如下:

boolean set(@Nullable V v) {
    return complete(v, null, COMPLETED);
}

boolean setException(Throwable t) {
    return complete(null, t, COMPLETED);
}

boolean cancel(boolean interrupt) {
    return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
}

从源码中可以看出三者都调用了complete(@Nullable V v, @Nullable Throwable t, int finalState)方法,下面对该方法进行详细的讲解,其核心逻辑如下:

  1. 将任务状态由running修改为completing状态
  2. 如果状态修改成功,则对结果value和异常exception赋值,异常的赋值主要看方法参数finalState,改状态如果为cancelled或者interrupted则为异常赋值,否则赋值为参数t的值(t可能为null),然后调用releaseShared(finalState)(该方法间接调用tryReleaseShared(arg))方法更新任务状态为最终状态。
  3. 如果状态修改失败,判断状态是否为completing状态,如果是则说明任务已经执行完成,只在赋值阶段,执行acquireShared(-1)获取锁操作,使自己阻塞至任务完成

源码如下:

private boolean complete(@Nullable V v, @Nullable Throwable t,
                         int finalState) {
    boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
    if (doCompletion) {
        // If this thread successfully transitioned to COMPLETING, set the value
        // and exception and then release to the final state.
        this.value = v;
        // Don't actually construct a CancellationException until necessary.
        this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
                ? new CancellationException("Future.cancel() was called.") : t;
        releaseShared(finalState);
    } else if (getState() == COMPLETING) {
        // If some other thread is currently completing the future, block until
        // they are done so we can guarantee completion.
        acquireShared(-1);
    }
    return doCompletion;
}

AbstractFuture的状态转换全部都在该方法中了。
三. 状态判断方法
主要判断任务状态state的值,state的值存放在AQS中,再次不过多讲解,源码如下:

boolean isDone() {
    return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
}
    
boolean isCancelled() {
    return (getState() & (CANCELLED | INTERRUPTED)) != 0;
}

boolean wasInterrupted() {
    return getState() == INTERRUPTED;
}

AbstractFuture<V>

  • 成员变量
  private final Sync<V> sync = new Sync<V>();
  private final ExecutionList executionList = new ExecutionList();

其中,sync已经讲解,ExecutionList主要用于执行listener,其实用可以参考我的另一篇博客《线程池系列六》-Guava ListenableFutureTask中有关于ExecutionList的详细讲解

  • 成员方法

大部分的成员方法都是直接调用sync的对应方法,并没有做过多的操作,只是简单的将方法暴露给外部使用而已。其中,get(long timeout, TimeUnit unit)、get()、isDone()、isCancelled()、wasInterrupted()都是直接调用的sync的方法。
除了上述方法外,还有赋值操作和取消操作的方法,由于该类方法设计到任务完成回调listener方法的关系,因此不是简单的调用sync的方法,其实现如下所示:

  protected boolean set(@Nullable V value) {
    boolean result = sync.set(value);
    if (result) {
      executionList.execute();
    }
    return result;
  }

如果设置成功,表示任务结束,则执行listener方法(executionList.execute())
setException(Throwable throwable)方法与set(@Nullable V value)操作一直,多了一个非空判断而已,不再讲解。
cancel() 如果取消成功,则执行listener回调,如果参数为真,则interruptTask();该方法为抽象方法,子类可以实现。cancel()的源码如下:

  public boolean cancel(boolean mayInterruptIfRunning) {
    if (!sync.cancel(mayInterruptIfRunning)) {
      return false;
    }
    executionList.execute();
    if (mayInterruptIfRunning) {
      interruptTask();
    }
    return true;
  }

AbstractFuture与FutureTask的区别

  1. AbstractFuture通过AQS实现,FutureTask通过unsafe CAS实现,本质是一样的
  2. AbstractFuture 有五种状态,两种任务结果value和exception,而FutureTask有七种状态,任务执行结果只有一个outcome
  3. AbstractFuture没有实现Runnable接口,不能作为任务放到线程池中执行,而FutureTask可以
  4. AbstractFuture有接口回调,FutureTask没有,但是留下了回调的接口,可以重写done()方法

AbstractFuture的使用场景

AbstractFuture是一个抽象类,我们需要自定义子类来使用AbstractFuture,又因此AbstractFuture并没有实现Runnable接口,因此其不适合和线程池配合使用(子类同时实现Runnable接口也是可以的)。它经常用来与AsyncHttpClient配合使用,使用异步HttpClient发起请求,在请求的回调中根据请求的返回结果执行AbstractFuture对象set()、setException()、cancel()操作。

欢迎扫描下方二维码,关注公众号,我们可以进行技术交流,共同成长

qrcode_for_gh_5580beb3cba1_430.jpg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容