- 原文链接: Multicasting in RxJava
- 原文作者: Daniel Lew
- 译文出自: 小鄧子的简书
- 译者: 小鄧子
- 状态: 完成
在RxJava中使用多点传播技巧是减少冗余工作的取胜之匙。
如果你想多点传播一个事件,也就是向所有的下游操作符或订阅者发送同一个事件。这在做耗时操作如网络请求等场景来讲是非常有用的。你不需要为每个订阅者做重复的网络请求,只需执行一次,然后传播响应结果即可。
这里有两种方式可以实现事件多播:
- 使用
ConnectableObservable
(通过publish()
或者replay()
[1]) - 使用
Subject
ConnectableObservable
或者Subject
的操作符逻辑值只会被执行一次,利用这种原理就可以实现向下游Subscriber
的事件广播了。
必须牢记的是:事件流以ConnectableObservable
或者Subject
作为多点传播的启动点,因此,这之后的逻辑会重复执行,并传播给每一个Subscriber
。
让我们通过以下示例,来了解它是如何发挥作用的:
Observable<String> observable = Observable.just("Event")
.publish()
.autoConnect(2)
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
});
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 got: Event
// Expensive operation for Event
// Sub2 got: Event
这个例子中,我们使用了ConnectableObservable
,一个耗时的map()
逻辑以及两个Subscribers
。令人惊讶的结果是,这个耗时的map()
逻辑执行了两次,尽管我们已经试图通过publish()
来阻止这种现象的发生。
通过图表来更加清晰的描述:
如果你想让map()
中的逻辑只发生一次,你需要把它放到调用publish()
操作符之前:
Observable<String> observable = Observable.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
})
.publish()
.autoConnect(2);
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 received: Event
// Sub2 received: Event
更新后的图表如下:
我们应该从中吸取什么教训呢?如果你想通过事件广播减少冗余操作,请保证在正确的启动点上实现。
很多人都在使用Subject
,我们不在这里对它品头论足。不得不说的是,它们都具有多点传播的特性,但是你要记住的是它们只会在发送事件这个启动点之后开始多点传播。也就是说,如果你在Subject
的下游添加了大量耗时操作符,那么你就需要考虑在下游的某个地方添加另外的publish()
。
-
share()
和cache()
也可供选择使用,因为ConnectableObservable.share()
的内置操作符就是publish().refCount()
,同样地,由于具有相同的处理能力,cache()
也可以通过replay().autoConnect()
来重新创建。 ↩