empty
,它会发送一个 Completed 事件,创建一个不发射任何数据但是正常终止的 Observable
// empty
let emptySeq: Observable<Int> = empty()
let subscription = emptySeq.subscribe { event in
print(event)
}
// 输出Completed
never
不会发送任何事件,创建一个不发射数据也不终止的 Observable
// never
let neverSeq: Observable<String> = never()
let subscription = neverSeq.subscribe { event in
print(event)
}
// 什么都没用打印
just
,会先发送Next(Value),然后发送 Completed,创建一个发射指定值的 Observable。
// just
let justSeq = just("RxSwift")
let subscription = justSeq.subscribe { event in
print(event)
}
// 打印
// Next(RxSwift)
// Completed
sequenceOf
把一系列元素转换为事件
// sequenceOf
let sequenceElements = sequenceOf(1, 2, 3, 4)
let subscription = sequenceElements.subscribe { event in
print(event)
}
from
通过 asObservable,把其他数据类型转换为 Observable
// asObservable, 把序列转换成Observable
let sequenceFromArray = [1, 2, 3, 4].asObservable()
let subscription = sequenceFromArray.subscribe { event in
print(event)
}
// 输出
// Next(1)
// Next(2)
// Next(3)
// Next(4)
// Completed
create
,通过它重新创建一个Observable,通过.on发射Next和Completed事件
// create创建序列,通过.on添加事件
let myCreate = { (element: Int) -> Observable<Int> in
return create { observer in
observer.on(.Next(element))
observer.on(.Completed)
return NopDisposable.instance
}
}
let subscription = myCreate(1).subscribe { event in
print(event)
}
// 输出
// Next(1)
// Completed
failWith
// failWith,只会发送Error事件
let error = NSError(domain: "error", code: 1001, userInfo: nil)
let errorSeq: Observable<Int> = failWith(error)
let subscription = errorSeq.subscribe { event in
print(event)
}
// 输出
// Error(Error Domain=error Code=1001 "(null)")
deferred
直到有订阅者订阅才创建 Observable,并为每一个订阅者创建一个新的 Observable,在某些情况下,要等到最后一分钟才能获取到最新的数据,就需要这样做
// deferred 会等到有订阅者,订阅它才创建Observable
let deferredSeq: Observable<Int> = deferred {
print("create")
return create { observer in
observer.on(.Next(0))
observer.on(.Next(1))
observer.on(.Next(2))
return NopDisposable.instance
}
}
deferredSeq.subscribe { event in
print(event)
}
/*
create
Next(0)
Next(1)
Next(2)
*/
Subjects
PublishSubject
// PublishSubject,会发送从订阅者,订阅之后的事件
let subject = PublishSubject<String>()
subject.subscribe { event in
print("1->\(event)")
}
subject.on(.Next("a"))
subject.on(.Next("b"))
subject.subscribe { event in
print("2->\(event)")
}
subject.on(.Next("c"))
// 输出
/*
1->Next(a)
1->Next(b)
1->Next(c)
2->Next(c)
*/
BehaviorSubject
// BehaviorSubject 它开始发射原始 Observable 最近发射的数据,然后继续发送其他来自Observable的数据
let subject = BehaviorSubject(value: "z")
subject.subscribe { event in
print("1->\(event)")
}
subject.on(.Next("a"))
subject.on(.Next("b"))
subject.subscribe { event in
print("2->\(event)")
}
subject.on(.Next("c"))
subject.on(.Completed)
// 输出
1->Next(a)
1->Next(b)
1->Next(c)
2->Next(c)
ReplaySubject
// ReplaySubject,当有订阅者,订阅了的时候他能重发事件,bufferSize指定重发的次数,补发1次
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject.subscribe { event in
print("1->\(event)")
}
subject.on(.Next("a"))
subject.on(.Next("b"))
subject.subscribe { event in
print("2->\(event)")
}
subject.on(.Next("c"))
// 输出
/*
1->Next(a)
1->Next(b)
2->Next(b)
1->Next(c)
2->Next(c)
*/
Transform
map
// map,通过对Observable发射映射的函数对每一项做转换
let originalSeq = sequenceOf(1, 2, 3)
originalSeq.map { $0 * 2 }.subscribe { print($0) }
/*
输出
Next(2)
Next(4)
Next(6)
Completed
*/
flatMap
// flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
let seqInt = sequenceOf(1, 2, 3)
let seqString = sequenceOf("a", "b", "c")
seqInt.flatMap { int in seqString }.subscribe { print($0) }
/*
Next(a)
Next(b)
Next(c)
Next(a)
Next(b)
Next(c)
Next(a)
Next(b)
Next(c)
Completed
*/
scan
// scan连续的对序列中的元素应用某一个元素。然后把结果作为下一次的输入
let seqToSum = sequenceOf(1, 2, 3, 4)
seqToSum.scan(0) { (sum, ele) -> Int in
sum + ele
}.subscribe { print($0) }
/*
输入
Next(1)
Next(3)
Next(6)
*/
Filtering
// filter,过滤不符合要求的元素
let seq = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8)
seq.filter { $0 % 2 == 0 }
.subscribe { print($0) }
/*
输出
Next(0)
Next(2)
Next(4)
Next(6)
Next(8)
Completed
*/
distinct
// distinct,去掉相邻的相同元素
let seq = sequenceOf(1, 2, 2, 2, 3, 1, 1, 4, 1, 4)
seq.distinctUntilChanged().subscribe { print($0) }
/*
输出
Next(1)
Next(2)
Next(3)
Next(1)
Next(4)
Next(1)
Next(4)
Completed
*/
take
// take保留前几项的数据
let subscription = sequenceOf(1, 2, 3, 4 , 5).take(3).subscribe { print($0) }
/*
Next(1)
Next(2)
Next(3)
Completed
*/
Combining(组合操作)
startWith
// startWith在数据序列前出入指定数据
let subscription = sequenceOf(2, 3, 4, 5).startWith(1).subscribe { print($0) }
/*
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Completed
*/
combineLatest
// combineLatest,当两个Observables中任意一个发送了数据,使用一个函数把两个Observable中最近的数据进行结合
let stringOb = PublishSubject<String>()
let intOb = PublishSubject<Int>()
combineLatest(stringOb, intOb) {
"\($0) \($1)"
}.subscribe { print($0) }
stringOb.on(.Next("A"))
intOb.on(.Next(1))
stringOb.on(.Next("B"))
stringOb.on(.Next("C"))
/*
Next(A 1)
Next(B 1)
Next(C 1)
*/
zip
// zip它会等到Observable中的数据一一对应结合时,再发送数据
let stringOb = PublishSubject<String>()
let intOb = PublishSubject<Int>()
zip(stringOb, intOb) { "\($0) \($1)" }.subscribe { print($0) }
stringOb.on(.Next("A"))
stringOb.on(.Next("B"))
intOb.on(.Next(1))
intOb.on(.Next(2))
merge
// merge,合并对个Observables的发射物
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
sequenceOf(subject1, subject2).merge().subscribe { print($0) }
subject1.onNext(1)
subject2.onNext(2)
subject1.onNext(3)
/*
Next(1)
Next(2)
Next(3)
*/
switchLatest
// switchLatest,将一个Observable<Observable<T>>转为为这样的Observable,它逐个发射数据的Observable
let var1 = Variable(0)
let var3 = Variable(var1)
let d = var3.switchLatest().subscribe { print($0) }