Subject
是一种桥梁和代理,在ReactiveX
的一些实现中,它既可以当作observer
也可以当做Observable
。因为它是一个observer
,所以它可以订阅一个或多个Observable
,同时因为它是一个Observable
,它可以传递它观察到的事件,重新发送他们,它也可以发送新的事件。
当Subject
订阅一个Observable
时,它可以触发Observable
发送事件(如果那是一个“cold” Observable
,以为“cold” Observable
会在被订阅之后开始发送信息)。这一特点可以让Subject
成为一个“hot”Observable,这个“hot” Observable是一种原来“cold”Observable
的变体。
AsyncSubject
一个AsyncSubject
只在原始Observable
完成后,发射来自原始Observable
的最后一个值。(如果原始Observable
没有发射任何值,AsyncObject
也不发射任何值)它会把这最后一个值发射给任何后续的观察者。
然而,如果原始的Observable
因为发生了错误而终止,AsyncSubject
将不会发射任何数据,只是简单的向前传递这个错误通知。
let subject = AsyncSubject<String>()
subject.onNext("Episode1 updated")
_ = subject.subscribe(onNext: {
print("Sub1 - what happened: \($0)")
})
subject.onNext("Episode2 updated")
let error = NSError.init(domain: "", code: 2, userInfo: ["213" :232])
// subject.onError(error)
subject.onCompleted()
PublishSubject
PublishSubject
只会把在订阅发生的时间点之后来自原始Observable
的数据发射给观察者。需要注意的是,PublishSubject
可能会一创建完成就立刻开始发射数据(除非你可以阻止它发生),因此这里有一个风险:在Subject
被创建后到有观察者订阅它之前这个时间段内,一个或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样做:或者使用Create
创建那个Observable
以便手动给它引入"冷"Observable
的行为(当所有观察者都已经订阅时才开始发射数据),或者改用ReplaySubject
。
let subject = PublishSubject<String>()
let sub1 = subject.subscribe(onNext: {
print("Sub1 - what happened: \($0)")
})
subject.onNext("Episode1 updated")
/*
但是执行一下就会发现,控制台上不会显示任何订阅消息,也就是说sub1没有订阅到任何内容。这是因为PublishSubject执行的是“会员制”,它只会把最新的消息通知给消息发生之前的订阅者。
*/
sub1.dispose()
let sub2 = subject.subscribe(onNext: {
print("Sub2 - what happened: \($0)")
})
subject.onNext("Episode2 updated")
subject.onNext("Episode3 updated")
sub2.dispose()
BehaviorSubject
当观察者订阅BehaviorSubject
时,它开始发射原始Observable
最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable
的数据。
/*
如果你希望Subject从“会员制”变成“试用制”,就需要使用BehaviorSubject。
它和PublisherSubject唯一的区别,就是只要有人订阅,
它就会向订阅者发送最新的一次事件作为“试用”。
*/
let subject = BehaviorSubject<String>(value: "RxSwift step by step")
_ = subject.subscribe(onNext: {
print("Sub1 - what happened: \($0)")
})
subject.onNext("Episode1 updated")
/*
由于BehaviorSubject有了一个默认的事件,sub1订阅之后,
就会陆续收到RxSwift step by step和Sub1 - what happened: Episode1 updated的消息了。
此时,如果我们再添加一个新的订阅者:
*/
_ = subject.subscribe(onNext: {
print("Sub2 - what happened: \($0)")
})
/*
此时,sub2就只能订阅到Sub2 - what happened: Episode1 updated消息了。
*/
ReplaySubject
ReplaySubject
会发射所有来自原始Observable
的数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject
,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
如果你把ReplaySubject
当作一个观察者使用,注意不要从多个线程中调用它的onNext
方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable
协议,给Subject
的结果增加了不确定性。
/*
ReplaySubject的行为和BehaviorSubject类似,都会给订阅者发送历史消息。不同地方有两点:
ReplaySubject没有默认消息,订阅空的ReplaySubject不会收到任何消息;
ReplaySubject自带一个缓冲区,当有订阅者订阅的时候,它会向订阅者发送缓冲区内的所有消息;
*/
let subject = ReplaySubject<String>.create(bufferSize: 3)
_ = subject.subscribe(onNext: {
print("Sub1 - what happened: \($0)")
})
subject.onNext("Episode1 updated")
subject.onNext("Episode2 updated")
subject.onNext("Episode3 updated")
_ = subject.subscribe(onNext: {
print("Sub2 - what happened: \($0)")
})
/*
由于subject缓冲区的大小是3,它会自动给sub2发送最新的三次历史事件。在控制台中执行一下,就可以看到注释中的结果了。
*/
Variable
除了事件序列之外,在平时的编程中我们还经常需遇到一类场景,就是需要某个值是有“响应式”特性的,例如可以通过设置这个值来动态控制按钮是否禁用,是否显示某些内容等。为了方便这个操作,RxSwift
还提供了一个特殊的subject
,叫做Variable
。
let stringVariable = Variable("Episode1")
_ = stringVariable
.asObservable()
.subscribe {
print("sub1: \($0)")
}
stringVariable.value = "Episode2"
/*
最后要说明的一点是,Variable只用来表达一个“响应式”值的语义,因此,它有以下两点性质:
绝不会发生.error事件;
无需手动给它发送.complete事件表示完成;
*/