这节就认认真真的开始讲 Observable.
创建一个 Observable
两段能跑的代码
本节代码中的 observer 就是上一节中的
// 3.1.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.create<Int> {
// it: ObservableEmitter<String!>
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onComplete()
}
observable.subscribe(observer) // observer 同上一节
}
输出
New Subscription
Next 1
Next 2
Next 3
All Completed
再来一段报错的代码
// 3.2.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.create<Int> {
// it: ObservableEmitter<String!>
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onError(Exception("My Custom Exception"))
}
observable.subscribe(observer) // observer 同上一节
}
输出
New Subscription
Next 1
Next 2
Next 3
Error Occured My Custom Exception
上面两段代码的主旨 -- Observable.create
我们可以通过 Observable.create
来创建 Observable
这个函数是一个高阶函数,它接收另一个(以 ObservableEmitter
为参数的)函数为参数,返回一个 Observable。我们可以在函数中利用 ObservableEmitter
弹射消息。(在这里 it 即为 ObservableEmitter
(Kotlin 中 Lambda 函数如果只有一个参数,可以省略声明,在函数体中以 it 指代))
ObservableEmitter
目前能用上的有三个方法
- onNext: 弹射消息
- onComplete: 弹射结束标志
- onError: 弹射错误
注意,这三者都可以省略
// 3.3.kt
// 省略 onComplete
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.create<Int> {
it.onNext(1) // 其实它也可省,可以删除看看效果
}
observable.subscribe(observer)
}
输出
New Subscription
Next 1
可以看到订阅依然会停止,但是不会调用 observer 的 onComplete
方法(和 3.1.kt
输出结果比较一下,少了最后一行)
弹射结束标志后 Observable 还可以弹射消息,但是 Observer 却不会对这些消息作出反应。可以看下面的例子。
// 3.4.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.create<Int> {
it.onComplete()
it.onNext(1)
}
observable.subscribe(observer)
}
输出
New Subscription
All Completed
All Completed 之后没有继续输出 Next 1
创建 Observable 的基本方法 -- Observable.fromX 与 Observable.just
Observable.fromX
// 3.5.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val list = listOf(1, 2, 3, 4)
val observable: Observable<Int> = Observable.fromIterable(list)
// 我们也可以用 list.toObservable() 替代 Observable.fromIterable(list)
observable.subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
Observable.just
// 3.6.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(54).subscribe(observer)
Observable.just(listOf(1, 2, 3)).subscribe(observer)
Observable.just(1, 2, 3).subscribe(observer)
}
输出
New Subscription
Next 54
All Completed
New Subscription // 注释1
Next [1, 2, 3]
All Completed
New Subscription // 注释2
Next 1
Next 2
Next 3
All Completed
可以看到 just 会把每个参数当作一个整体(注释1)(而 from 会把参数展开,下面有直观对比的例子)。如果有多个参数会把它们一个一个的弹出来(注释2)。
just 与 fromX 的区别
// 3.7.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
// 下面两者等效
Observable.just(1, 2, 3).subscribe(observer)
Observable.fromIterable(listOf(1, 2, 3)).subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next 3
All Completed
New Subscription
Next 1
Next 2
Next 3
All Completed
just fromX 与 create 的区别
create
中需要显式弹射结束标志(it.onComplete()
),Observer 才会调用 onComplete
方法 (3.3.kt
中,没有 it.onComplete()
输出中就没有 All Completed)
just
fromX
会自动弹射结束标志 (3.6.kt
3.7.kt
3.8.kt
中均有输出 All Completed)
创建 Observable 的其它常用方法
empty 没有值
// 3.8.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.empty<String>().subscribe(observer)
}
输出
New Subscription
All Completed
interval 隔一定的时间弹射一个值
// 3.9.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
Observable.interval(600, TimeUnit.MILLISECONDS).subscribe(observer)
Thread.sleep(2000) // 要有这一行
}
输出
New Subscription
Next 0
Next 1
Next 2
timer 一段时间后弹射一个值
// 3.10.kt
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
Observable.timer(20, TimeUnit.MILLISECONDS).subscribe(observer)
Thread.sleep(50) // 要有这一行
}
输出
New Subscription
Next 0
All Completed
range 一个范围内依次弹射
// 3.11.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.range(1, 3).subscribe(observer)
}
输出
New Subscription
Next 1
Next 2
Next 3
All Completed
这节基本上OK了,下一节主要内容是 Observer
subscribe
与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 1----环境配置与初体验
RxKotlin 例子不超过15行教程 2----Observable Observer 与 Subscribe 简介
RxKotlin 例子不超过15行教程 3----Observable 的创建
RxKotlin 例子不超过15行教程 4----Observer Subscribe 与 Hot/Cold Observable
RxKotlin 例子不超过15行教程 5----Subject
RxKotlin 例子不超过15行教程 6----Operator 与 Marble Diagram
RxKotlin 例子不超过15行教程 7----Backpressure Flowable 与 Subscriber 简介