RxJava(RxKotlin)、RxAndroid
ps:文章中涉及到的代码均使用 Kotlin 实现,即需要导入 RxKotlin,同时也涉及到了 RxAndroid 相关内容
导入方法:
- 在项目的 build.gradle 文件中添加 RxKotlin 的版本信息
buildscript {
ext.rx_kotlin_version = '1.0.0'
ext.rx_android_version = '1.2.1'
}
- 在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依赖
dependencies {
// RxKotlin RxAndroid
implementation "io.reactivex:rxkotlin:$rx_kotlin_version"
implementation "io.reactivex:rxandroid:$rx_android_version"
}
1. 一些常用的网站
2. 观察者模式的四大要素
-
Observable
被观察者 -
Observer
观察者 -
subscribe
订阅 - 事件
3. 操作符
3.1 Creating 操作符
create
just
from
range
repeat
interval
defer
empty / never
timer
start
-
create
操作符,直接创建一个 Subscriber 对象
Observable.create<String> {
it.onNext("Hello Rx!")
it.onCompleted()
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> Hello Rx!
onCompleted()
-
just
操作符将一系列对象逐个发射出去,注意集合对象将作为一个整体进行发射
Observable.just(1, 1.0, "String", true)
.subscribe(object : Subscriber<Any>() {
override fun onNext(t: Any) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
Observable.just(listOf(1, 2, 3, 4, 5))
.subscribe(object : Subscriber<List<Int>>() {
override fun onNext(t: List<Int>) {
t.forEach { println("onNext() --> $it") }
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 1.0
onNext() --> String
onNext() --> true
onCompleted()
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 5
onCompleted()
-
from
操作符可以将集合中的元素逐个发射出去
Observable.from(listOf(5, 4, 3, 2, 1, 0))
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 5
onNext() --> 4
onNext() --> 3
onNext() --> 2
onNext() --> 1
onNext() --> 0
onCompleted()
-
range
在一定范围内向观察者发射整型数据,repeat
重复发射,默认重复无数次
Observable.range(1, 3)
.repeat(2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 1
onNext() --> 2
onNext() --> 3
onCompleted()
-
interval
定时向观察者发送一个 Long 类型的数字(逐个叠加)
Observable.interval(2, 2, TimeUnit.SECONDS)
.subscribe(object : Subscriber<Long>() {
override fun onNext(t: Long) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 0
onNext() --> 1
onNext() --> 2
onNext() --> 3
...
-
defer
延迟创建 Observable 对象,只有在调用 subscribe() 方法时,才会创建 Observable 对象
var arg = "初始值"
val observable = Observable.defer { Observable.just(arg) }
arg = "再次赋值"
observable.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 再次赋值
onCompleted()
3.2 Transforming 操作符
map
flatMap
groupBy
buffer
scan
window
map
Observable.just(123, 234).map {
"¥ $it"
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> ¥ 123
onNext() --> ¥ 234
onCompleted()
flatMap
Observable.just(123, 234, 345)
.flatMap {
Observable.just("$ $it")
}.subscribe(object : Subscriber<String>() {
override fun onNext(t: String) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> $ 123
onNext() --> $ 234
onNext() --> $ 345
onCompleted()
groupBy
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy { it % 2 }
.subscribe(object : Observer<GroupedObservable<Int, Int>> {
override fun onError(e: Throwable?) {
}
override fun onNext(t: GroupedObservable<Int, Int>) {
t.subscribe(object : Subscriber<Int>() {
override fun onNext(r: Int) {
println("group -> ${t.key}, value -> $r")
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
}
override fun onCompleted() {
}
})
group -> 1, value -> 1
group -> 0, value -> 2
group -> 1, value -> 3
group -> 0, value -> 4
group -> 1, value -> 5
group -> 0, value -> 6
buffer
Observable.range(0, 7)
.buffer(3)
.subscribe(object : Subscriber<List<Int>>() {
override fun onNext(t: List<Int>) {
println(t)
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
[0, 1, 2]
[3, 4, 5]
[6]
scan
Observable.range(1, 5)
.scan { sum, num -> sum + num }
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("sum = $t")
}
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
})
sum = 1
sum = 3
sum = 6
sum = 10
sum = 15
3.3 Filtering 操作符
debounce // 在一定时间间隔内没有操作,数据才会发射给观察者
distinct // 去重
elementAt // 取指定位置的一个数据
filter // 按照指定的规则进行条件的过滤
first // 取第一个数据
last // 取最后一个数据
ignoreElements // 忽略所有数据,不向观察者发送任何数据,只回调 onCompleted() 或 onError()
sample // 取样
skip // 跳过
skipLast // 跳过最后几项
take
takeLast
debounce
Observable.create<Int> {
for (i in 1..10) {
try {
it.onNext(i)
if (i % 2 == 0) {
Thread.sleep(1000)
} else {
Thread.sleep(2000)
}
} catch (e: Exception) {
it.onError(e)
}
}
it.onCompleted()
}
.subscribeOn(Schedulers.io())
.debounce(2, TimeUnit.SECONDS)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 3
onNext() --> 5
onNext() --> 7
onNext() --> 9
onNext() --> 10
onCompleted()
distinct
Observable.just(1, 2, 3, 4, 2, 3)
.distinct()
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
3.4 Combining 操作符(组合)
zip
merge
startWith
combineLatest
join
switchOnNext
-
zip
用来合并两个 Observable 发射的数据项,根据 Func2() 函数指定的规则生成一个新的 Observable 并发射出去,当其中一个 Observable 发射数据结束或者出现异常后,另一个 Observable 也将停止发射数据
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.zip(observable1, observable2) { o1, o2 ->
o1 + o2
}.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 11
onNext() --> 22
onNext() --> 33
onCompleted()
-
merge
将两个 Observable 发射的数据项按照发射时间顺序合并成一个 Observable 进行发射
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.merge(observable1, observable2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 10
onNext() --> 20
onNext() --> 30
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
-
startWith
用于在一个 Observable 发射数据前插入一个 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.startWith(observable2)
.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 10
onNext() --> 20
onNext() --> 30
onCompleted()
-
combineLatest
用于将两个 Observable 发射的临近的数据项通过 Func2() 函数指定的规则组合成一个新的 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.combineLatest(observable1, observable2) { o1, o2 ->
o1 + o2
}.subscribe(object : Subscriber<Int>() {
override fun onNext(t: Int) {
println("onNext() --> $t")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
onNext() --> 31
onNext() --> 32
onNext() --> 33
onNext() --> 34
onCompleted()
join
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.join(observable2,
{ Observable.timer(2, TimeUnit.SECONDS) },
{ Observable.timer(0, TimeUnit.SECONDS) },
{ t1, t2 -> Observable.just(t1 + t2) })
.subscribe(object : Subscriber<Observable<Int>>() {
override fun onNext(t: Observable<Int>) {
t.subscribe(object : Subscriber<Int>() {
override fun onNext(data: Int) {
println("onNext() --> $data")
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
}
override fun onCompleted() {
println("onCompleted()")
}
override fun onError(e: Throwable?) {
println("onError()")
}
})
onNext() --> 11
onCompleted()
onNext() --> 21
onCompleted()
onNext() --> 31
onCompleted()
onNext() --> 12
onCompleted()
onNext() --> 22
onCompleted()
onNext() --> 32
onCompleted()
onNext() --> 13
onCompleted()
onNext() --> 23
onCompleted()
onNext() --> 33
onCompleted()
onNext() --> 14
onCompleted()
onNext() --> 24
onCompleted()
onNext() --> 34
onCompleted()
onCompleted()
4. 线程调度(结合 RxAndroid)
-
Schedulers.io()
I/O 线程,执行耗时操作 -
AndroidSchedulers.mainThread()
Android 中的UI线程,执行UI更新 -
subscribeOn()
调度被观察者运行的线程 -
observeOn()
调度观察者运行的线程