你想要发射一些数据,但是你不确定这些数据将会在什么时候被发射出来,也不知道他们的数量。
显然 使用just和from不能帮助你完成这个任务 而你也无法使用create来创建一个可以在某些情况下自旋的Observable(这些创建方法的具体区别以后再讲)
最佳的情况就是你拥有一个对象,它既是一个Observable——这样的话使用者可以链接他们,也是一个Observer——这样的话你就可以发射数据(onNext)和终结事件(onCompleted)了。
①PublisSubject
Subject可以被看做为【热Observable】,就算没有观察者(订阅者)监听它,它也会自动工作。
在RxJava中,这样的Subject叫做PublishSubject,我们可以这样创建它:
Subject<String, String> changeEvents = PublishSubject.create();
显然 使用create工厂方面能够帮助我们避免重复输入泛型参数,但是我们为什么不能再Rx.Net中 使用new PublishSubject<>()呢?理由在于流式API的产生原理。
如果你还记得的话,当你通过create方法创建Observable的时候,你需要提供一个OnSubscriber实例,当一个订阅者订阅这个
Observable的时候,最终会调用OnSubscribe的call方法。
不像【冷Observable】(create方式创建的就是冷的),Subject必须追踪他们的使用者,这种追踪既发生在内部的OnSubscribe中,也发生在Subject自己身上。很不幸,Java并不允许一个在构造函数中的内部类去访问父类的成员,因此,这种共享的变量必须提取到另外一个独立的类里面然后展示给他们。这种重要的机制就隐藏在PublishSubject.create工厂方法里面。(看了下源码,这个类叫做SubjectSubscriptionManager)
②ReplaySubject
有时候你并不想立刻分发时间,而是等待一个合适的时机。
比如说,你是一个电视台网络并且每周播放大量的电视连续剧。但是,你的一些使用者并不总是能够跟得上播放的节奏,但是他们也不想错过某一集。因此,他们的智能电视提供了一种缓存电视连续剧的能力,在某一个节点开始,允许订阅者以它自己的步子观察序列,并且不错过任何一个。
在RxJava里,这样的Subject叫做ReplaySubject。默认情况下,你可以创建一个无边界的ReplaySubject对象,它可以缓存所有它接收到的数据并且重播给他的订阅者,包括那些已经终结的事件。
然而,在一些用例中,你希望限制一个最大事件或者是希望缓存的事件数量,RxJava提供了以下三种额外的重播模式:
①createWithSize(n):最多保留n个元素
②createWithTime(t,u):保留那些比t年轻的元素
③createWithTimeAndSize(n,t,u):最多保留n个元素并且要比t年轻
在大多数情况下,这些就足够用了。
③AsyncSubject
然而,考虑这样一种情形,你进行了一种异步运算并且只想发射完成事件中的那个结果值。
ReplaySubject确实也起作用(从头开始计算一遍)但是这样做的话不仅冗余代价也太高昂了。
因此RxJava提供了另外一种Subject叫做AsyncSubject,它能记住它最后接收到的元素并且一旦onCompleted方法被调用(需要你显示调用它),所有当前的和未来的监听器都将会接受到completion事件的那个单一的值。但是和ReplaySubject不同,一旦一个事件调用了onError方法,那么所有之前接受到的值都将被忽略并且所有的订阅者都将接收到Throwable这个错误提示对象。
④BehaviorSubject
在RxJava中的最后一种情况是你只想保留最后一个事件。一样,ReplaySubject也能够办到(将元素设为1),但是和ReplaySubject不同,你一旦调用onCompleted方法,它会驱逐保存的值,那些在子序列中的订阅者无法接收到这个值而只是会接受到onCompleted(或者onError)事件而已。