上一节中我们说到了 Hot Observable
的一个实现 ---- ConnectableObservable
。这一节中我们说一说 Hot Observable
的另一种实现 ---- Subject
Subject
按照惯例,先来一段能跑的代码
// 5.1.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(10, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>() // 注释1
observable.subscribe(subject) // 描点1 Subject 充当 Observer 角色
subject.subscribe({ println("Received $it") }) // 描点2 Subject 充当 Observable 角色
Thread.sleep(60)
}
输出
Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
注释1
我们可以用 PublishSubject.create()
来创建 PublishSubject
(PublishSubject
下方介绍)
-
PublishSubject
是Subject
的一种 -
Subject
是Hot Observable
的一种
(这里有空我补一个关系图)
Subject
是 Observable
与 Observer
的组合体
-
Observable
的所有操作符
它都有(操作符
会在之后的章节介绍) - 它也可以像
Observer
一样接收值 - 左耳朵进右耳朵出(对,我妈经常这么说我)。如果你向它的
Observer
接口传入值(描点1
), 它会把这些值选择性地从Observable
接口处弹出去(描点2
)
(PublishSubject
会把所有从Observer
接口传入的值按照时间顺序全部传出去)
这么做有什么用处,既然我们可以直接从 源Observable
订阅,为什么要在中间加一层 PublishSubject
? 来看下一个例子
PublishSubject 的作用
// 5.2.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
observable.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(200)
observable.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(300)
}
输出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 2 Received 0 // 注释1
Subscription 1 Received 3
Subscription 2 Received 1
Subscription 1 Received 4
Subscription 2 Received 2
注释1
订阅2
从 0
开始接收消息(因为它订阅的 observable
是一个 Cold Observable
,所以会从头发送)
这里的输出结果和下面对比一下
// 5.3.kt
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
observable.subscribe(subject)
subject.subscribe({ println("Subscription 1 Received $it") })
Thread.sleep(300)
subject.subscribe({ println("Subscription 2 Received $it") })
Thread.sleep(200)
}
输出
Subscription 1 Received 0
Subscription 1 Received 1
Subscription 1 Received 2
Subscription 1 Received 3
Subscription 2 Received 3 // 注释1
Subscription 1 Received 4
Subscription 2 Received 4
注释1
订阅2
从 3
开始接收消息(它错过了 0
1
2
, 我们说过 Subject
是 Hot Observable
的一种)
在这里,我们通过 PublishSubject
把原来的 Cold
变成了 Hot
(上一节的 publish
也能实现此功能,只不过得到的是 ConnectableObservable
)
Subject 的各种实现
AsyncSubject
下面这张图是为了阐述 ReactiveX 原理常用的 Marble Diagram
,我会在明天专门去说 Marble Diagram
如何看(之前我也是各种看不懂,捂脸)
(图片来自 ReactiveX documentation)
AsyncSubject
会从 源Observable
(Subject
的 Observer
接口传入值来自 源Observable
) 接收所有值,并把最后一个值从 Observable
接口处弹出去,看一个例子
// 5.4.kt
import io.reactivex.Observable
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val observable = Observable.just(1, 2, 3, 4)
val subject = AsyncSubject.create<Int>()
observable.subscribe(subject)
subject.subscribe(observer)
}
输出
New Subscription
Next 4
All Completed
我们可以不订阅任何的 Observable
而直接调用 Subject
的 onNext
方法(Observer
接口)传入值(其实上面 Subject
订阅 Observable
的时候,Subject
会在内部对每一个从 Observable
得到的值调用 onNext
方法)。就像这个例子
// 5.5.kt
import io.reactivex.subjects.AsyncSubject
fun main(args: Array<String>) {
val subject = AsyncSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer) // 订阅1
subject.onNext(3)
subject.subscribe(observer) // 订阅2
subject.onNext(4)
subject.onComplete()
}
输出
New Subscription
New Subscription
Next 4 // 订阅1(我知道你要问为什么不输出 2 而是 4,下面有解释)
All Completed
Next 4 // 订阅2
All Completed
AsyncSubject
当且仅当调用 onComplete
方法时才会弹出值(和( ConnectableObservable
与 connect
方法的关系)差不多)
所以 订阅1
并没有输出 Next 2 而是输出 Next 4。
PublishSubject
PublishSubject
会把所有从 Observer
接口传入的值按照时间顺序全部传出
(图片来自 ReactiveX documentation)
BehaviorSubject
把 PublishSubject
与 AsyncSubject
组合在一起差不多就是 BehaviorSubject
。
BehaviorSubject
会弹出订阅 BehaviorSubject
之前的最后一个值(AsyncSubject
的特性)和订阅 BehaviorSubject
之后的所有值(PublishSubject
的特性)
// 5.6.kt
import io.reactivex.subjects.BehaviorSubject
fun main(args: Array<String>) {
val subject = BehaviorSubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onNext(4)
subject.onComplete()
}
输出
/*
New Subscription
Next 2 // 订阅1 获取到了 `2` 而跳过了 `1`
Next 3 // 订阅1 获取到了订阅之后的值
New Subscription
Next 3 // 订阅2
Next 4 // 订阅1
Next 4 // 订阅2
All Completed
All Completed
*/
ReplaySubject
它和 Cold Observable
的性质差不多(我还不知道它有什么用,麻烦哪位同学告诉我,我加在这里,先谢过了)
(图片来自 ReactiveX documentation)
// 5.7.kt
import io.reactivex.subjects.ReplaySubject
fun main(args: Array<String>) {
val subject = ReplaySubject.create<Int>()
subject.onNext(1)
subject.onNext(2)
subject.subscribe(observer)
subject.onNext(3)
subject.subscribe(observer)
subject.onComplete()
}
输出
/*
New Subscription
Next 1
Next 2
Next 3
New Subscription
Next 1
Next 2
Next 3
All Completed
All Completed
*/
这节 OK 了,明天我们来一起学习一下看图(Marble Diagram)识字....
RxKotlin 例子不超过15行教程 1----环境配置与初体验
RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介
RxKotlin 例子不超过15行教程 3----Observable 的创建
RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 5----Subject
RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram
RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介