在前面学习了操作符响应式编程开源库 RxJava2——操作符,通过源码结合文档大概知道了怎么去理解和分析操作符。
下面我们将学习下什么是Hot Observable 和Cold Observable。在前面的学习中我们了解到,当只有我们订阅后,数据才会发射。因为只有调用Observable中的订阅方法 subscribe(Observer<? super T> observer)
后,才会调用 subscribeActual(Observer<? super T> s)
方法进行数据的发送。
官方对于Hot和Cold的定义是:
When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.
In some implementations of ReactiveX, there is also something called a “Connectable” Observable. Such an Observable does not begin emitting items until its Connect method is called, whether or not any observers have subscribed to it.
被观察者是什么时候发送可观测序列中的元素呢?它取决于被观察者。
当Hot Observable被创建就开始发送数据,所以如果是之后的观察者,只能接收到它订阅之后发送的数据,而不能接收到之前的数据了。
在reactivex的一些实现中,也有一个称为“可连接的”可观察序列。不管是否有观察者订阅,直到调用了它的connect方法,观察者才开始发出元素。
当一个Cold Observable被创建后,在发送数据前,就一直等待被订阅,一旦被订阅就会发送整个序列的数据。
-
Cold Observable:
所有的Observable默认都是Cold Obserable。这就是说我们使用诸如Observable.create()或者Observable.fromArray()这类的方法所创建出来的Observable都是Cold Observable。
任何订阅Cold Observable的订阅者都会接收到独立的数据流。
如果没有订阅者订阅,它就什么事情也不会做,是被动的。 -
Hot Observable:
一旦Hot Observable被创建了,不管有没有订阅者,它们都会开始发送数据。
相同时间开始订阅的订阅者会得到同样的数据。
Cold Observable的创建就是一般的创建方法。API中提供了相关的方法能将Cold Observable转化为Hot Observable。下面来用具体例子看看这两种Observable的区别,进一步对上面的学习进行验证。
List<Integer> list = new ArrayList<>();
Observable.range(0, 5)
.subscribe(count -> list.add(count));
//Cold Observable
Observable<List<Integer>> listObservable = Observable.fromArray(list);
//Hot Observable
listObservable.subscribe(s->log("第一个cold observable"+s));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
listObservable.subscribe(s->log("第二个cold observable"+s));
listObservable.subscribe(s->log("第三个cold observable"+s));
ConnectableObservable<List<Integer>> publish = listObservable.publish();
publish.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
log("hot observable"+integers);
}
});
我们通过 publish()
方法将Cold Observable转化为Hot Observable。我们看下打印结果,结合之前源码的学习(对象的不可变性--Rx中也就是对数据的重新映射)Cold Observable在任何时间订阅都会将一个独立的完整流发送出去。但是我们发现Hot Observable根本没有打印。这里代码基本没有问题了,那么就是方法调用的问题。
这里我们从源码部分来找原因,首先看下
publish()
方法,源码中的解释是它返回一个ConnectableObservable对象,但是当这个对象调用 connect()
方法之前一直处于等待发送数据的状态。所以将一个Cold Observable转化为Hot Observable的方法是调用 publish()
方法,要让它开始发送数据要调用 connect()
方法。
Returns a {@link ConnectableObservable}, which is a variety of ObservableSource that waits until its
{@link ConnectableObservable#connect connect} method is called before it begins emitting items to those
{@link Observer}s that have subscribed to it.
这里有两种特殊情况:
如果我们使用Hot Observable还是想接收到订阅之前的数据呢?这时候需要用 replay()
如果不想没有订阅的时候就一直发送数据,至少有一个订阅者时才发送数据使用 refCount()