Guava——ListenableFuture

缘由

To simplify matters, Guava extends the Future interface of the JDK with ListenableFuture
We strongly advise that you always use ListenableFuture instead of Future in all of your code, because:

  • Most Futures methods require it.
  • It's easier than changing to ListenableFuture later.
  • Providers of utility methods won't need to provide Future and ListenableFuture variants of their methods.

Interface

A ListenableFuture allows you to register callbacks to be executed once the computation is complete, or if the computation is already complete, immediately. This simple addition makes it possible to efficiently support many operations that the basic Future interface cannot support.

The basic operation added by ListenableFuture is addListener(Runnable, Executor), which specifies that when the computation represented by this Future is done, the specified Runnable will be run on the specified Executor.

Adding Callbacks

Most users will prefer to use Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor), or the version which defaults to using MoreExecutors.directExecutor(), for use when the callback is fast and lightweight. A FutureCallback<V> implements two methods:

  • onSuccess(V), the action to perform if the future succeeds, based on its result
  • onFailure(Throwable), the action to perform if the future fails, based on the failure

Creation

Corresponding to the JDK ExecutorService.submit(Callable) approach to initiating an asynchronous computation, Guava provides the ListeningExecutorService interface, which returns a ListenableFuture wherever ExecutorService would return a normal Future. To convert an ExecutorService to a ListeningExecutorService, just useMoreExecutors.listeningDecorator(ExecutorService).

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Alternatively, if you're converting from an API based on FutureTask, Guava offers ListenableFutureTask.create(Callable<V>) and ListenableFutureTask.create(Runnable, V). Unlike the JDK, ListenableFutureTask is not meant to be extended directly.

If you prefer an abstraction in which you set the value of the future rather than implementing a method to compute the value, consider extending AbstractFuture<V> or using SettableFuture directly.

If you must convert a Future provided by another API to an ListenableFuture, you may have no choice but to use the heavyweight JdkFutureAdapters.listenInPoolThread(Future) to convert a Future to a ListenableFuture.
Whenever possible, it is preferred to modify the original code to return a ListenableFuture.

Application

The most important reason to use ListenableFuture is that it becomes possible to have complex chains of asynchronous operations.

* An AsyncFunction<A, B> provides one method, ListenableFuture<B> apply(A input). It can be used to asynchronously transform a value.

ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
  new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
      return dataService.read(rowKey);
    }
  };
ListenableFuture<QueryResult> queryFuture =
    Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);

Many other operations can be supported efficiently with a ListenableFuture that cannot be supported with a Future alone. Different operations may be executed by different executors, and a single ListenableFuture can have multiple actions waiting upon it.

When several operations should begin as soon as another operation starts -- "fan-out" -- ListenableFuture just works: it triggers all of the requested callbacks. With slightly more work, we can "fan-in," or trigger a ListenableFuture to get computed as soon as several other futures have all finished: see the implementation of Futures.allAsList for an example.

Avoid nested Futures

In cases where code calls a generic interface and returns a Future, it's possible to end up with nested Futures. For example:

executorService.submit(new Callable<ListenableFuture<Foo>() {
  @Override
  public ListenableFuture<Foo> call() {
    return otherExecutorService.submit(otherCallable);
  }
});

would return a ListenableFuture<ListenableFuture<Foo>>.

This code is incorrect, because if a cancel on the outer future races with the completion of the outer future, that cancellation will not be propagated to the inner future.
It's also a common error to check for failure of the other future using get() or a listener, but unless special care is taken an exception thrown fromotherCallable would be suppressed.
To avoid this, all of Guava's future-handling methods (and some from the JDK) have Async versions that safely unwrap this nesting - transform(ListenableFuture<A>, Function<A, B>, Executor) and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor), or ExecutorService.submit(Callable) and submitAsync(AsyncCallable<A>, Executor), etc.

CheckedFuture

Guava also provides a CheckedFuture<V, X extends Exception> interface. A CheckedFutureis a ListenableFuture that includes versions of the get methods that can throw a checked exception. This makes it easier to create a future that executes logic which can throw an exception. To convert a ListenableFuture to a CheckedFuture, useFutures.makeChecked(ListenableFuture<V>, Function<Exception, X>).

main class

  • MoreExecutors -- Executors
    该类是final类型的工具类,提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。
  • ListeningExecutorService -- ExecutorService
    该类是对ExecutorService的扩展,重写ExecutorService类中的submit方法,返回ListenableFuture对象。
  • ListenableFuture -- Future
    该接口扩展了Future接口,增加了addListener方法,该方法在给定的excutor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用。
  • FutureCallback jdk没有的东西
    该接口提供了OnSuccess和OnFailuren方法。获取异步计算的结果并回调。
  • Futures
    该类提供和很多实用的静态方法以供使用。
  • ListenableFutureTask -- ListenableFutureTask
    该类扩展了FutureTask类并实现ListenableFuture接口,增加了addListener方法。

Future局限性

Future 具有局限性。在实际应用中,当需要下载大量图片或视频时,可以使用多线程去下载,提交任务下载后,可以从多个Future中获取下载结果,由于Future获取任务结果是阻塞的,所以将会依次调用Future.get()方法,这样的效率会很低。很可能第一个下载速度很慢,则会拖累整个下载速度。
Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的

  • 没有好的方法去判断第一个完成的任务(可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。)
  • Future的get方法 是阻塞的,使用不当会造成线程的浪费。(可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决)
  • 不能防止任务的重复提交。(要做到这件事就需要 Future 最常见的一个实现类 FutureTask 了)

在实际的使用中建议使用Guava ListenableFuture来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态。

Test Code

使用callback

public static void testRateLimiter() {
        ListeningExecutorService executorService = MoreExecutors
                .listeningDecorator(Executors.newCachedThreadPool());

        RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
        List<ListenableFuture<Integer>> listfutures = Lists.newArrayList();
        ListenableFuture<Integer> tmp = null;
        for (int i = 0; i < 10; i++) {
            limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
            tmp = executorService.submit(new Task(i));
            tmp.addListener(new Runnable() {
                @Override
                public void run() {
                    System.out.println("add Listener");
                }
            }, executorService);

            Futures.addCallback(tmp, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer result) {
                    System.out.println("suc"+result);
                }

                @Override
                public void onFailure(Throwable t) {
                    System.out.println("fail"+t.toString());
                }
            });

            listfutures.add(tmp);

        }

        listfutures.forEach(e-> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("e = " + e.get());
                System.out.println("e = " + e.get());
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            } catch (ExecutionException e1) {
                e1.printStackTrace();
            }
        });
    }

    static class Task implements Callable<Integer> {
        private int number;
        public Task(int i){
            this.number = i;
        }
        @Override
        public Integer call() throws Exception {
            TimeUnit.SECONDS.sleep(2);
            System.out.println("call execute.." + number);
            return number;
        }
    }

使用链式future

那如果需要多重回调呢?

方法 描述
transform 加一个回调函数
allAsList 返回一个ListenableFuture ,该ListenableFuture 返回的result是一个List,List中的值是每个ListenableFuture的返回值,假如传入的其中之一fails或者cancel,这个Future fails 或者canceled
successAsList 返回一个ListenableFuture ,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代
public static void testLinkedFutureLisener() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        final ListeningExecutorService poolService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
        ListenableFuture<String> futureBase = poolService.submit(new Task("task1"));
        Futures.addCallback(futureBase, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.out.println("onSuccess result = " + result);
            }

            @Override
            public void onFailure(Throwable t) {
                System.out.println("onFailure result = " + t.toString());

            }
        });

        // 链式1

        ListenableFuture<String> base_1 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("base_1回调线程正在执行...input:"+input);
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("base_1回调线程 done");

                        return input + " & base_1回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        ListenableFuture<String> base_2 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("base_2回调线程正在执行...input:"+input);
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("base_2回调线程 done");

                        return input + " & base_2回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        ListenableFuture<String> first = Futures.transform(base_2, new AsyncFunction<String, String>() {
            public ListenableFuture<String> apply(final String input) throws Exception {
                ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                    public String call() throws Exception {
                        System.out.println("first回调线程正在执行...input:"+input);
                        try {
                            String resBase1 =  base_1.get();
                            System.out.println("resBase1 = " + resBase1);
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println("first 回调线程 done");

                        return input + " & first回调线程的结果 ";
                    }
                });
                return temp;
            }
        }, poolService);

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        poolService.shutdown();
    }

// 运行结果:
task1 doing
task1done
onSuccess result = task1
base_2回调线程正在执行...input:task1
base_1回调线程正在执行...input:task1
base_1回调线程 done
base_2回调线程 done
first回调线程正在执行...input:task1 & base_2回调线程的结果 
resBase1 = task1 & base_1回调线程的结果 
first 回调线程 done

Ref:
https://github.com/google/guava/wiki/ListenableFutureExplained
https://blog.csdn.net/pistolove/article/details/51232004

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容