Netty源码分析1 - Promise 异步框架的设计

promise-framework是一个从netty抽取出来的通用promise异步框架,并简化了Listener架构。

github:https://github.com/zhaojigang/concurrent-framework
  • 一、使用姿势
  • 二、代码架构
  • 三、代码分析
  • 附、bug记录

一、使用姿势

1.1、回调方式(推荐 - 完全异步)

    @Test
    public void testListenerNotifyLater() {
        int numListenersBefore = 2; // 设置结果前设置两个listener
        int numListenersAfter = 3; // 设置结果后设置三个listener

        CountDownLatch latch = new CountDownLatch(numListenersBefore + numListenersAfter);
        DefaultPromise<Void> promise = new DefaultPromise<>();

        for (int i = 0; i < numListenersBefore; i++) {
            promise.addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    latch.countDown();
                }
            });
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                promise.setSuccess(null);

                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < numListenersAfter; i++) {
                            promise.addListener(future -> {
                                latch.countDown();
                            });
                        }
                    }
                }).start();
            }
        }).start();

        try {
            Assert.assertTrue(latch.await(100, TimeUnit.SECONDS), "expect notify " + (numListenersBefore + numListenersAfter) + " listeners");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

流程(非常重要):

  • 首先在main线程中为DefaultPromise实例添加了两个listener
  • 之后启动另外一个线程A去设置值(此时就会回调已经加入到当前的DefaultPromise实例中的两个listener#operationComplete(Future<Void> future),然后删除这两个listener,也就是说一个listener只能被通知一遍)
  • 之后线程A又启动了另外的一条线程B为当前的DefaultPromise实例添加了3个listener,注意,此时每添加一个listener,就会立即回调其operationComplete方法,因为当前的DefaultPromise.isDone()==true了,就是说当前的DefaultPromise实例已经完成了。

1.2、阻塞get方式 - (不推荐 - 可能阻塞)

    @Test
    private void testFutureStyleWithWaitNotifyAll() throws ExecutionException, InterruptedException {
        Promise<Model> promise = new DefaultPromise<>();

        /**
         * 一个线程在执行get(),进行wait()
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Object result = promise.get();// 等待条件
                    // 之后做相应的业务逻辑
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // sleep 2s 使第一个线程先等待着
        Thread.sleep(2000);

        /**
         * 另外一个线程在设置值,notifyAll唤醒wait()线程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                promise.setSuccess(new Model(1L));
            }
        }).start();
    }

步骤:

  • 标准的future阻塞姿势:一个线程执行get(),进行wait()阻塞;另外一个线程设置值,执行notifyAll()

二、代码架构

DefaultPromise.png

说明:

  • java.util.concurrent.Future:Java并发包提供的Future类。
  • io.hulk.promise.framework.Future:继承java.util.concurrent.Future,增强功能。
  • AbstractFuture:实现了java.util.concurrent.Future的get()和get(long timeout, TimeUnit unit)阻塞等待模式,使用模板模式搭建这两个方法的基本骨架。
  • Promise:可写的Future,提供setSuccess()等接口方法。
  • DefaultPromise:最终的实现类,该实现类实现了观察者模式。

三、代码分析

3.1 java.util.concurrent.Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法较为简陋,例如只有isDone(),而没有isSuccess()这样的方法,没有添加listener的接口,也没有设置是否可以取消的接口。所以使用io.hulk.promise.framework.Future增强java.util.concurrent.Future。

3.2 io.hulk.promise.framework.Future

/**
 * from netty4.1
 */
public interface Future<V> extends java.util.concurrent.Future<V> {
    /**
     * Returns {@code true} if and only if the I/O operation was completed successfully.
     */
    boolean isSuccess();

    /**
     * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has failed.
     *
     * @return the cause of the failure. {@code null} if succeeded or this future is not completed yet.
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.
     * The specified listener is notified when this future is {@linkplain #isDone() done}.
     * If this future is already completed, the specified listener is notified immediately.
     */
    Future<V> addListener(FutureListener<V> listener);

    /**
     * Adds the specified listeners to this future.
     * The specified listeners is notified when this future is {@linkplain #isDone() done}.
     * If this future is already completed, the specified listeners is notified immediately.
     */
    Future<V> addListeners(List<FutureListener<V>> listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this future is {@linkplain #isDone() done}.
     * If the specified listener is not associated with this future, this method does nothing and returns silently.
     */
    Future<V> removeListener(FutureListener<V> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners is no longer notified when this future is {@linkplain #isDone() done}.
     * If the specified listeners is not associated with this future, this method does nothing and returns silently.
     */
    Future<V> removeListeners(List<FutureListener<V>> listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
     * This method catches an {@link InterruptedException} and discards it silently.
     * 即不响应中断
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit
     * @throws InterruptedException if the current thread was interrupted
     */
    boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit timeUnit);

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit
     * @throws InterruptedException if the current thread was interrupted
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return {@code null}.
     *
     * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
     * if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value.
     */
    V getNow();

    /**
     * {@inheritDoc}
     *
     * If the cancellation was successful it will fail the future with an {@link java.util.concurrent.CancellationException}.
     *
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning); // {@inheritDoc} 用在一个@Override的方法上,表示为父类的方法添加详细的注释
}

3.3 FutureListener

/**
 * Listens to the result of a {@link Future}.
 * The result of the asynchronous operation is notified once
 * this listener is added by calling {@link Future#addListener(FutureListener)}.
 *
 * @author zhaojigang
 * @date 2018/7/16
 */
public interface FutureListener<V> {
    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future the source {@link Future} which called this callback
     */
    void operationComplete(Future<V> future) throws Exception;
}

注意:operationComplete(Future<V> future)中的future参数就是调用该方法的那个Future实例,在DefaultPromise中会有体现。

3.4 AbstractFuture

/**
 * from netty4.1
 */
public abstract class AbstractFuture<V> implements Future<V>{
    @Override
    public V get() throws InterruptedException, ExecutionException {
        /**
         * 阻塞等到await()调用完成,即失败或返回结果
         */
        await();
        /**
         * 获取失败异常信息
         */
        Throwable cause = cause();
        /**
         * 如果异常信息为null,直接获取响应结果
         */
        if (cause == null) {
            return getNow();
        }
        /**
         * 如果返回结果result == CancellationException(即执行了cancel()),则抛出该异常
         * 否则,抛出ExecutionException
         */
        if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        }
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if(await(timeout, unit)) {
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException)cause;
            }
            throw new ExecutionException(cause);
        }
        /**
         * 如果没有在指定的时间内await没有完成,抛出超时异常
         */
        throw new TimeoutException();

    }
}

使用模板模式,定义好get()和get(long timeout, TimeUnit unit)的基本框架,至于具体await()/await(timeout, unit)/cause()/getNow()等方法就由具体的类来实现了。

值得注意的是:

  • 如果一个task被成功的cancel()了,会直接抛出CancellationException。
  • get()/get(long timeout, TimeUnit unit)是阻塞获取结果的,所以netty不推荐使用这种方式。

3.5 Promise

/**
 * from netty4.1
 * Special {@link Future} which is writable.
 *
 * 添加设置操作
 * 将Future中返回值为Future的全部override为Promise
 * @author zhaojigang
 * @date 2018/7/16
 */
public interface Promise<V> extends Future<V> {
    /**
     * Marks this future as a success and notifies all listeners.
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all listeners.
     *
     * @return {@code true} if and only if successfully marked this future as a success.
     *         Otherwise {@code false} because this future is already marked as either a success or a failure.
     */
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all listeners.
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all listeners.
     *
     * @return {@code true} if and only if successfully marked this future as a failure.
     *         {@code false} because this future is already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable
     *                      or it is already done without being cancelled.
     *         {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(FutureListener<V> listener);

    @Override
    Promise<V> addListeners(List<FutureListener<V>> listeners);

    @Override
    Promise<V> removeListener(FutureListener<V> listener);

    @Override
    Promise<V> removeListeners(List<FutureListener<V>> listeners);

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();
}

Promise是一种可进行写的Future,具有设置成功结果,设置失败结果等功能,这样可以在成功或失败的时候回调注册到当前Promise实例的listeners了。就是一种完全异步的方式了,而AbstractFuture#get可能需要阻塞,所以netty推荐我们使用listener回调模式。

3.6 DefaultPromise

/**
 * from netty4.1
 * <p>
 * 一、DefaultPromise状态转换图:
 * A {@link DefaultPromise} is either <em>uncompleted</em> or <em>completed</em>.
 * When an I/O operation begins, a new future object is created.
 * The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.
 * If the I/O operation is finished either successfully, with failure, or by cancellation,
 * the future is marked as completed with more specific information, such as the cause of the
 * failure.
 * Please note that even failure and cancellation belong to the completed state.
 * <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>
 * <p>
 * <p>
 * 二、DefaultPromise实现了两种执行机制:
 * 1、future:wait/notify实现,可能要阻塞,使用方最终调用到DefaultPromise父类AbstractFuture#get或者DefaultPromise#syncXxx
 * 2、listener:其实就是callback实现,不需要阻塞,当setSuccess/trySuccess/setFailure/tryFailure/cancel会直接调用listener(回调函数)当然如果有等待条件的其他线程,也会notifyAll
 * <p>
 * 推荐使用第二种,完全异步的。
 */
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPromise.class);
    /**
     * 返回结果result的原子更新器
     */
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER
            = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    /**
     * 返回结果
     */
    private volatile Object result;
    /**
     * 成功的返回标记
     */
    private static final Object SUCCESS = new Object();
    /**
     * 不可取消的标记
     */
    private static final Object UNCANCELLABLE = new Object();
    /**
     * wait线程的数量,注意该参数的修改要进行同步(恰好该参数的所有修改地方都在一个synchronized中)
     */
    private short waiters;
    /**
     * cancel()时要将此项异常塞入result
     */
    private static final Throwable CANCELLATION_CAUSE = new CancellationException(DefaultPromise.class.getName() + " invoked cancel()");
    /**
     * Threading - synchronized(this) 事件监听器列表
     * If {@code empty}, it means either 1) no listeners were added yet or 2) all listeners were notified.
     * 也就是说 一个listener通知过一次就会被删除,不会再通知第二次
     */
    private List<FutureListener<V>> listeners;
    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification.
     */
    private boolean notifyingListeners;

    /**
     * future返回是否成功
     *
     * @return {@code true} 结果不为空 && 结果不是Throwable(失败)&& 结果不是UNCANCELLABLE(不能取消)
     */
    @Override
    public boolean isSuccess() {
        /**
         * 将成员变量result转换为局部变量进行操作的原因?
         *
         * 一、是因为 在下面的代码中会多次调用this.result,当外界的this.result引用发生变化时,由于this.result是被volatile修饰的,如果直接使用this.result将会导致多次获取的result不同,
         * 但是this.result引用发生变化时,局部变量result不会发生变化(注意修改的是this.result引用的值,而不是this.result指向的地址的值,类似下边的程序)
         * <pre>
         *         public static void main(String[] args) {
         *          DefaultPromiseTest test = new DefaultPromiseTest();
         *
         *          Model m2 = test.m;
         *          System.out.println(m2);
         *
         *          test.m = new Model(200L); // 注意:这里不是this.m.setId(300),所以下面的m2不变
         *          System.out.println(m2);
         *        }
         *
         * </pre>
         *
         * 二、由于this.result是被volatile修饰的,每次获取都要强制从主存中获取,无法从工作线程直接获取,所以代价较大,而且将频繁操作的成员变量局部化更方便JIT优化
         * https://blog.csdn.net/shaomingliang499/article/details/50549306
         */
        Object result = this.result;
        return result != null && !(result instanceof Throwable) && result != UNCANCELLABLE;
    }

    /**
     * 等待线程是否可取消
     *
     * @return {@code true} 如果返回结果result为null,表示没有返回成功,也没有返回失败,也没有设置不可取消,此时可以取消
     */
    @Override
    public boolean isCancellable() {
        return result == null;
    }

    /**
     * 查询cause:如果result instanceof Throwable,那么表示返回结果出错了,否则 cause = null,表示一定没有错误
     *
     * @return
     */
    @Override
    public Throwable cause() {
        Object result = this.result;
        return result instanceof Throwable ? (Throwable) result : null;
    }

    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public boolean setUncancellable() {
        Object result = this.result;
        /**
         * 从uncompleted设置为UNCANCELLABLE,如果设置成功,直接返回
         */
        if (result == UNCANCELLABLE || RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
            return true;
        }

        /**
         * 如果completed 而且又没被cancel(),此时返回true
         */
        return isDone0(result) && !isCancelled();
    }

    @Override
    public Promise<V> addListener(FutureListener<V> listener) {
        ObjectUtil.checkNotNull(listener, "listener");
        /**
         * 防止多个线程同时操作listeners队列
         */
        synchronized (this) {
            if (listeners == null) {
                listeners = new ArrayList<>();
            }
            listeners.add(listener);
        }

        /**
         * 如果该listener是后加入的,则直接唤醒
         */
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> addListeners(List<FutureListener<V>> listeners) {
        ObjectUtil.checkNotNull(listeners, "listeners");
        synchronized (this) {
            if (this.listeners == null) {
                listeners = new ArrayList<>();
            }
            this.listeners.addAll(listeners);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> removeListener(FutureListener<V> listener) {
        ObjectUtil.checkNotNull(listeners, "listeners");
        ObjectUtil.checkNotNull(listener, "listener");
        synchronized (this) {
            listeners.remove(listener);
        }
        return this;
    }

    @Override
    public Promise<V> removeListeners(List<FutureListener<V>> listeners) {
        ObjectUtil.checkNotNull(this.listeners, "listeners");
        ObjectUtil.checkNotNull(listeners, "listeners");
        synchronized (this) {
            this.listeners.removeAll(listeners);
        }
        return this;
    }

    @Override
    public Promise<V> sync() throws InterruptedException {
        /**
         * 如果await()发生了异常,这里正好直接抛出
         */
        await();
        /**
         * 如果await()返回了错误,也直接抛出
         */
        rethrowIfFailed();
        return this;
    }

    private void rethrowIfFailed() {
        Throwable cause = cause();
        if (cause == null) {
            return;
        }
        throw (RuntimeException) cause;
    }

    @Override
    public Promise<V> syncUninterruptibly() {
        awaitUninterruptibly();
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        /**
         * wait()/notify()机制:
         * 前提:每个对象都有一个锁 + 一个锁等待队列 + 一个条件等待队列。
         * 线程协作:wait()/notify()通常都是由两个线程来协作的,一个wait()等待条件,另一个notify()唤醒等待线程
         * 为什么加锁:wait()/notify()是必须加锁执行的(内部执行机制),否则会抛出异常IllegalMonitorStateException,锁对象是当前实例。
         *
         * wait内部执行机制:
         * 1、把当前线程放入锁对象的条件等待队列,之后释放锁(注意:一定会释放锁,否则notify的线程将无法获取该对象锁),进入阻塞状态WAITING或TIMED_WAITING
         * 2、当等待时间到了或者被其他线程notify/notifyAll了,则等待的当前线程从条件等待队列中移除出来,之后再尝试获取锁,如果没有获取锁,进入锁等待队列,线程状态改为BLOCKED;如果获取了锁,从wait调用中返回
         *
         * 为什么要写成:
         * <pre>
         *    while (!isDone()) {
         *      wait();
         *    }
         * </pre>
         * 而不是
         * <pre>
         *     if(!isDone()) {
         *         wait();
         *     }
         * </pre>
         *
         * wait()表示阻塞等待,正常情况下while和if形式是等价的,但是为了防止wait()被意外唤醒,所以需要在wait()之后继续进行判断
         */
        synchronized (this) {
            while (!isDone()) {
                /**
                 * 执行wait()之前:waiters加1
                 */
                incWaiters();
                try {
                    wait();
                } finally {
                    /**
                     * wait()结束之后,waiters减1
                     */
                    decWaiters();
                }
            }
        }
        return this;
    }

    @Override
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }

        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    decWaiters();
                }
            }
        }

        /**
         * 捕获了中断异常,默默执行中断
         */
        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }

    @Override
    public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return await0(timeUnit.toNanos(timeout), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeout, TimeUnit timeUnit) {
        try {
            return await0(timeUnit.toNanos(timeout), false);
        } catch (InterruptedException e) {
            throw new InternalError();
        }
    }

    @Override
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeoutMillis) {
        try {
            return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
        } catch (InterruptedException e) {
            throw new InternalError();
        }
    }

    @Override
    public V getNow() {
        Object result = this.result;
        if (result instanceof Throwable || result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        return (V) result;
    }

    /**
     * 查看java.util.concurrent.Future#cancel()的注释,
     * This attempt will fail if the task has already completed(成功 || 失败 || 已被取消), has already been cancelled,
     * or could not be cancelled for some other reason
     *
     * @param mayInterruptIfRunning this value has no effect in this implementation.
     * @return
     */
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE)) {
            checkNotifyWaiters();
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public boolean isCancelled() {
        return result instanceof CancellationException;
    }

    @Override
    public boolean isDone() {
        return isDone0(this.result);
    }

    /**
     * 分析并发问题:
     * 1、假设没有notifyingListeners:
     * 当前线程A执行到while(true)的时候,假设另一条线程B也添加了FutureListener并进入了第一个同步块,此时线程B也进入了while(true),
     * B开始执行后来的这些FutureListeners,之后A才开始执行一开始的FutureListeners,这样就不能保证FIFO的执行FutureListener
     * <p>
     * 2、加入notifyingListeners:
     * 在线程A执行到第二个synchronized块中的if (this.listeners == null)中之前,线程B进入第一个同步块,由于notifyingListeners = true,则直接返回了,
     * 而B后来添加的FutureListeners,A会在第二个同步快判断的时候发现当前的this.listeners.size>0,会继续赋值给本地变量继续第二轮循环.
     * <p>
     * 这里有一个疑问:当外界的this.listeners发生变化时,temListeners是否变化,假设A执行到while(true),B执行了addListener,则此时外界的this.listener改变了值,temListener是否发生变化
     */
    private void notifyListeners() {
        List<FutureListener<V>> temListeners;
        synchronized (this) {
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            temListeners = this.listeners;
            this.listeners = null; // 通知完之后就置空,不再通知第二次
        }

        while (true) {
            notifyListeners0(temListeners);
            synchronized (this) {
                if (this.listeners == null) {
                    notifyingListeners = false;
                    return;
                }
                temListeners = this.listeners;
                this.listeners = null;
            }
        }
    }

    private void notifyListeners0(List<FutureListener<V>> listeners) {
        for (FutureListener<V> listener : listeners) {
            try {
                listener.operationComplete(this);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 设置成功标志
     * result != null ? result : SUCCESS
     */
    private boolean setSuccess0(V result) {
        return setValue0(result != null ? result : SUCCESS);
    }

    /**
     * 设置失败标志
     * result == cause
     */
    private boolean setFailure0(Throwable cause) {
        return setValue0(ObjectUtil.checkNotNull(cause, "cause"));
    }

    private boolean setValue0(Object result) {
        /**
         * 更新result结果,唤醒所有阻塞线程
         * 将result从null置为result 或者 从UNCANCELLABLE置为result(因为有可能是先将result置为UNCANCELLABLE的)
         */
        if (RESULT_UPDATER.compareAndSet(this, null, result) ||
                RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, result)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

    private void checkNotifyWaiters() {
        synchronized (this) {
            if (waiters > 0) {
                notifyAll();
            }
        }
    }

    public boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
    }

    private void incWaiters() {
        if (++waiters > Short.MAX_VALUE) {
            throw new IllegalStateException("too many waiters :" + this);
        }
    }

    private void decWaiters() {
        --waiters;
    }

    /**
     * 关于中断:
     * <p>
     * 1、前提:线程有六种状态,{@link Thread#getState()}
     * NEW:A thread that has not yet started is in this state.
     * RUNNABLE:A thread executing in the Java virtual machine is in this state.
     * it may be waiting for other resources from the operating system such as processor.
     * BLOCKED:A thread that is blocked waiting for a monitor lock is in this state.
     * A thread in the blocked state is waiting for a monitor lock
     * to enter a synchronized block/method or
     * reenter a synchronized block/method after calling {@link Object#wait() Object.wait}.
     * WAITING:A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
     * TIMED_WAITING:A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
     * TERMINATED:A thread that has exited is in this state.
     * <p>
     * 2、在不同阶段调用中断Thread.currentThread().interrupt()
     * NEW/TERMINATED:interrupt()中断没有任何效果,中断位isInterrupted=false
     * RUNNABLE: interrupt()中断没有效果,中断位isInterrupted=true,在run()方法中自己选择合适的点去处理
     * BLOCKED:interrupt()中断位isInterrupted=true,不会使当前线程跳出锁等待队列,也就是说依然在等待锁
     * WAITING/TIMED_WAITING: interrupt()抛出InterruptedException,设置isInterrupted=false,所以根据需要,需要自己去设置中断位
     *
     * @param timeoutNanos  纳秒级别的超时时间
     * @param interruptable 是否可中断
     * @return
     * @throws InterruptedException
     */
    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        /**
         * 如果completed,直接返回
         */
        if (isDone()) {
            return true;
        }

        /**
         * 如果传入的超时时间<=0,直接result
         */
        if (timeoutNanos <= 0) {
            return isDone();
        }

        /**
         * 如果可中断 && 线程已被中断,抛出中断异常
         */
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        boolean interrupted = false;
        long startTimeNanos = System.nanoTime();
        try {
            while (true) {
                synchronized (this) {
                    if (isDone()) {
                        return true;
                    }
                    incWaiters();
                    try {
                        wait(timeoutNanos / 1000000, (int) timeoutNanos % 1000000);
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        } else {
                            /**
                             * 对于中断来讲,抛出了中断异常时,Thread.currentThread().isInterrupted() == false,即不会设置中断标志位。
                             * 需要通过Thread.currentThread().interrupt()来设置中断标志位,来使外界自己根据中断位去做一些事
                             * Waits for this future to be completed without interruption. 所以在最后的finally才会中断
                             */
                            interrupted = true;
                        }
                    } finally {
                        decWaiters();
                    }
                }

                if (isDone()) {
                    return true;
                }

                /**
                 * 防护性判断
                 */
                if (System.nanoTime() - startTimeNanos >= timeoutNanos) {
                    return isDone();
                }
            }
        } finally {
            if (interrupted) {
                /**
                 * 此时线程处于RUNNABLE状态,执行interrupt()设置中断标志位
                 */
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append(this.getClass().getSimpleName()).append("@").append(Integer.toHexString(hashCode()));

        Object result = this.result;
        if (result == SUCCESS) {
            builder.append("success");
        } else if (result == UNCANCELLABLE) {
            builder.append("uncancellable");
        } else if (result instanceof Throwable) {
            builder.append(result);
        } else if (result != null) {
            builder.append("success " + result);
        } else {
            builder.append("incompleted");
        }

        return builder.toString();
    }
}

说明:

  • notifyListeners0(List<FutureListener<V>> listeners)方法中调用listener.operationComplete(this);而this就是当前的DefaultPromise实例。
  • 通过使用notifyingListeners属性来实现监听器的先入先出。
  • 实现机制:在业务逻辑执行前添加监听器addListener(FutureListener<V> listener)在执行完业务逻辑之后,执行setSuccess/trySuccess/setFailure/tryFailure等方法,此时会执行notifyAll()并回调添加进来的监听器。假设有线程阻塞在get()方法上时,在此时会做唤醒。

注意:

  • DefaultPromise的状态机流转图:见类注释。
  • DefaultPromise可以实现的两种使用机制:见类注释。再强调一点,建议使用回调方式。
  • 学习使用AtomicReferenceFieldUpdater来实现属性的cas更新。
  • 学习成员变量局部化的做法:不只是防止引用的并发修改,还是优化性能的一种方式。
  • 学习wait/notify的实现机制:最佳实践见《Effective Java 中文版 第2版》的“第69条”
  • 学习线程的六种状态与中断对各种状态的影响。

附、Bug记录

  • netty的getNow有bug,我 这里 提了个issue,netty也将在4.1.28版本修复该bug。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • 主要内容 1.什么是异步 程序或系统中关于异步的概念使用的比较多,那么什么是异步呢?下面举个生活中最常见的情景来进...
    topgunviper阅读 2,188评论 0 12
  • 译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.j...
    高广超阅读 5,073评论 1 68
  • 季文子三思而后行。子闻之曰:“再,斯可矣。”三思而后行,一思再思就好了,思那么多干嘛? 是的“思”超过三次就会褪去...
    家悦hideyuki阅读 476评论 0 0
  • 直播算时间总是算不会,从症结来看是把之前的经验生搬硬套,用A的经验解决B的情况。比如第四版块时间不够了,那前面第三...
    话木阅读 172评论 0 0
  • 当我每次拿起笔来,准备写作的时候,脑海里也像大多数写作者一样,常常打几个问号。 我为什么喜欢写作文?我想写些什么?...
    乡村奶奶阅读 124评论 3 3