多线程Future

Future介绍

传统的设计模式如下:

调用者调用某个方法,会一直阻塞直到方法返回,才能做接下来的步骤。

Futrue模式就是Action先给Invoker一个未来(future),其实也就是票据,Invoker就可以继续接下来的步骤而无需等待Action结果的返回。等Invoker做完事情再去拿Action返回的结果。这就是一个异步化的流程。

在多线程中经常举的一个例子就是:网络图片的下载,刚开始是通过模糊的图片来代替最后的图片,等下载图片的线程下载完图片后在替换。而在这个过程中可以做一些其他的事情。

Future代码实现

【自定义Future】

阻塞式实现:

public class FutureInAction {

    public static void main(String[] args){
        String value = block(()->{
            try {
                Thread.sleep(10000L);  
                return "I am finished";
            } catch (InterruptedException e) {
                return "error";
            }
        });  // 此处会产生阻塞,直到结果返回
        System.out.print(value);
    }

    private static <T> T block(Callable<T> callable){
        return callable.action();
    }

    private interface Callable<T>{
        T action();
    }
}

结果迟迟都不会返回,代码阻塞在System.out.print(value);上了。

Future实现:

public class FutureInAction {

    public static void main(String[] args){
        Future<String> future = invoke(()->{
            try {
                Thread.sleep(10000L); // 设定方法执行十秒钟,十秒钟过后才返回结果
                return "I am finished";
            } catch (InterruptedException e) {
                return "error";
            }
        });  // 此处不会产生阻塞,会立刻返回
        System.out.println(future.get());  // 可以做任意其他操作
        System.out.println(future.get());
        System.out.println(future.get());

         // 如果方法还没有执行完,则等待10秒钟
         // 因为我们设定了方法执行时间为10秒,所以这里等待10秒,肯定能拿到结果
        while(!future.isDown()){ 
            try {
                Thread.sleep(10000L);   
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(future.get());
    }

    private static <T> Future<T> invoke(Callable<T> callable){
        AtomicReference<T> result = new AtomicReference<>();
        AtomicBoolean finished = new AtomicBoolean(false);

        Thread t = new Thread(()->{
            T value = callable.action();
            result.set(value);
            finished.set(true);
        });
        t.start();

        Future<T> future = new Future<T>() {
            @Override
            public T get() {
                return result.get();
            }

            @Override
            public boolean isDown() {
                return false;
            }
        };

        return future;
    }

    private interface Future<T>{
        T get();
        boolean isDown();
    }

    private interface Callable<T>{
        T action();
    }
}

运行之后立刻输出:

null
null
null

等了10秒之后,才输出:

I am finished

也就是结果立即得到了返回,而不会阻塞在方法的执行上面。在我们拿到结果之前我们还可以做其他操作。

【JDK 自带Future、Callable、ExecutorService】

上面的代码是我们自定义实现的Future,其实JDK有自带Future类供我们使用。

public class FutureInAction {

    public static void main(String[] args)
            throws ExecutionException, InterruptedException, TimeoutException {
        /**
         * 创建了一个线程池,这个线程池只有一根线程
         * 这个线程可以复用,一个任务结束了,它还会继续保持线程等待另外一个任务去执行
         * 所以用ExecutorService的时候记得shutdown()
         */
        ExecutorService es = Executors.newSingleThreadExecutor();
        Future<String> future = es.submit(()->{
            try {
                Thread.sleep(10000L);
                return "I am finished";
            } catch (InterruptedException e) {
                return "error";
            }
        });

        // ....
        // ....
        // ....

        // String value = future.get();  //会阻塞直到结果返回
        // String value = future.get(10,TimeUnit.MILLISECONDS);  //最多等10秒钟,等不到就继续往下,如果等不到结果那么下面就会抛出异常
        // System.out.print(value);
        while(!future.isDone()){ 
            Thread.sleep(10000L);
        }
        System.out.print(future.get());  // 输出结果之后程序不会停止,仍在运行
        es.shutdown(); // 程序真正地关闭
    }
}

【实现一个异步基于事件回调的Future程序】

上面的代码我们都需要用一个isDown()方法来确定结果是否获取到了,但是这种实现方法有个弊端就是,如果方法还没有执行完,就会在这个方法上面产生阻塞,等十秒钟过后我们才能拿到结果。

现在如果我们想要实现如下效果:当Action结果返回的时候通知我们去取结果,而不用我们自己手动去判断结果是否完成。

public class FutureInAction {

    public static void main(String[] args){
        Future<String> future = invoke(()->{
            try {
                Thread.sleep(10000L);
                return "I am finished";
            } catch (InterruptedException e) {
                return "error";
            }
        });
        future.setCompletable(new Completable<String>() {
            @Override
            public void complete(String s) {
                System.out.println(s);
            }

            @Override
            public void exception(Throwable cause) {
                System.out.println(cause);
            }
        });

        System.out.println("......");
        System.out.println(future.get());
        System.out.println(future.get());
    }

    private static <T> Future<T> invoke(Callable<T> callable){
        AtomicReference<T> result = new AtomicReference<>();
        AtomicBoolean finished = new AtomicBoolean(false);

        Future<T> future = new Future<T>() {
            private Completable<T> completable;

            @Override
            public T get() {
                return result.get();
            }

            @Override
            public boolean isDown() {
                return false;
            }

            @Override
            public void setCompletable(Completable<T> completable) {
                this.completable = completable;
            }

            @Override
            public Completable<T> getCompletable() {
                return completable;
            }
        };


        Thread t = new Thread(()->{
            try{
                T value = callable.action();
                result.set(value);
                finished.set(true);
                if(future.getCompletable()!=null)
                    future.getCompletable().complete(value);
            }catch (Throwable cause){
                if(future.getCompletable()!=null)
                    future.getCompletable().exception(cause);
            }
        });
        t.start();
        return future;

    }

    private interface Future<T>{
        T get();
        boolean isDown();
        void setCompletable(Completable<T> completable);
        Completable<T> getCompletable();
    }

    private interface Callable<T>{
        T action();
    }

    private interface Completable<T>{
        void complete(T t);
        void exception(Throwable cause);
    }
}

运行之后立刻输出:

......
null
null

等待了10秒之后才输出

I am finished
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容