RxJava(RxKotlin)、RxAndroid 简单使用

RxJava(RxKotlin)、RxAndroid

ps:文章中涉及到的代码均使用 Kotlin 实现,即需要导入 RxKotlin,同时也涉及到了 RxAndroid 相关内容

导入方法:

  1. 在项目的 build.gradle 文件中添加 RxKotlin 的版本信息
buildscript {
    ext.rx_kotlin_version = '1.0.0'
    ext.rx_android_version = '1.2.1'
}
  1. 在 module 的 build.gradle 文件中添加 RxKotlin 以及 RxAndroid 的依赖
dependencies {
    // RxKotlin RxAndroid
    implementation "io.reactivex:rxkotlin:$rx_kotlin_version"
    implementation "io.reactivex:rxandroid:$rx_android_version"
}

1. 一些常用的网站

  1. RxJava文档
  2. RxJava中文文档
  3. RxJava经典资料

2. 观察者模式的四大要素

  1. Observable 被观察者
  2. Observer 观察者
  3. subscribe 订阅
  4. 事件

3. 操作符

3.1 Creating 操作符

create
just
from
range
repeat
interval
defer
empty / never
timer
start
  • create 操作符,直接创建一个 Subscriber 对象
Observable.create<String> {
    it.onNext("Hello Rx!")
    it.onCompleted()
}.subscribe(object : Subscriber<String>() {
    override fun onNext(t: String) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> Hello Rx!
onCompleted()
  • just 操作符将一系列对象逐个发射出去,注意集合对象将作为一个整体进行发射
Observable.just(1, 1.0, "String", true)
        .subscribe(object : Subscriber<Any>() {
            override fun onNext(t: Any) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })

Observable.just(listOf(1, 2, 3, 4, 5))
        .subscribe(object : Subscriber<List<Int>>() {
            override fun onNext(t: List<Int>) {
                t.forEach { println("onNext() --> $it") }
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 1.0
onNext() --> String
onNext() --> true
onCompleted()
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 5
onCompleted()
  • from 操作符可以将集合中的元素逐个发射出去
Observable.from(listOf(5, 4, 3, 2, 1, 0))
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 5
onNext() --> 4
onNext() --> 3
onNext() --> 2
onNext() --> 1
onNext() --> 0
onCompleted()
  • range 在一定范围内向观察者发射整型数据,repeat 重复发射,默认重复无数次
Observable.range(1, 3)
        .repeat(2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 1
onNext() --> 2
onNext() --> 3
onCompleted()
  • interval 定时向观察者发送一个 Long 类型的数字(逐个叠加)
Observable.interval(2, 2, TimeUnit.SECONDS)
        .subscribe(object : Subscriber<Long>() {
            override fun onNext(t: Long) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 0
onNext() --> 1
onNext() --> 2
onNext() --> 3
...
  • defer 延迟创建 Observable 对象,只有在调用 subscribe() 方法时,才会创建 Observable 对象
var arg = "初始值"
val observable = Observable.defer { Observable.just(arg) }
arg = "再次赋值"
observable.subscribe(object : Subscriber<String>() {
    override fun onNext(t: String) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> 再次赋值
onCompleted()

3.2 Transforming 操作符

map
flatMap
groupBy
buffer
scan
window
  • map
Observable.just(123, 234).map {
       "¥ $it"
   }.subscribe(object : Subscriber<String>() {
       override fun onNext(t: String) {
           println("onNext() --> $t")
       }
       override fun onCompleted() {
           println("onCompleted()")
       }
       override fun onError(e: Throwable?) {
           println("onError()")
       }
   })
onNext() --> ¥ 123
onNext() --> ¥ 234
onCompleted()
  • flatMap
Observable.just(123, 234, 345)
        .flatMap {
            Observable.just("$ $it")
        }.subscribe(object : Subscriber<String>() {
            override fun onNext(t: String) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> $ 123
onNext() --> $ 234
onNext() --> $ 345
onCompleted()
  • groupBy
Observable.just(1, 2, 3, 4, 5, 6)
        .groupBy { it % 2 }
        .subscribe(object : Observer<GroupedObservable<Int, Int>> {
            override fun onError(e: Throwable?) {
            }
            override fun onNext(t: GroupedObservable<Int, Int>) {
                t.subscribe(object : Subscriber<Int>() {
                    override fun onNext(r: Int) {
                        println("group -> ${t.key}, value -> $r")
                    }
                    override fun onCompleted() {
                    }
                    override fun onError(e: Throwable?) {
                    }
                })
            }
            override fun onCompleted() {
            }
        })
group -> 1, value -> 1
group -> 0, value -> 2
group -> 1, value -> 3
group -> 0, value -> 4
group -> 1, value -> 5
group -> 0, value -> 6
  • buffer
Observable.range(0, 7)
        .buffer(3)
        .subscribe(object : Subscriber<List<Int>>() {
            override fun onNext(t: List<Int>) {
                println(t)
            }
            override fun onCompleted() {
            }
            override fun onError(e: Throwable?) {
            }
        })
[0, 1, 2]
[3, 4, 5]
[6]
  • scan
Observable.range(1, 5)
        .scan { sum, num -> sum + num }
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("sum = $t")
            }
            override fun onCompleted() {
            }
            override fun onError(e: Throwable?) {
            }
        })
sum = 1
sum = 3
sum = 6
sum = 10
sum = 15

3.3 Filtering 操作符

debounce // 在一定时间间隔内没有操作,数据才会发射给观察者
distinct // 去重
elementAt // 取指定位置的一个数据
filter // 按照指定的规则进行条件的过滤
first // 取第一个数据
last // 取最后一个数据
ignoreElements // 忽略所有数据,不向观察者发送任何数据,只回调 onCompleted() 或 onError()
sample // 取样
skip //  跳过
skipLast // 跳过最后几项
take
takeLast
  • debounce
Observable.create<Int> {
    for (i in 1..10) {
        try {
            it.onNext(i)
            if (i % 2 == 0) {
                Thread.sleep(1000)
            } else {
                Thread.sleep(2000)
            }
        } catch (e: Exception) {
            it.onError(e)
        }
    }
    it.onCompleted()
}
        .subscribeOn(Schedulers.io())
        .debounce(2, TimeUnit.SECONDS)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 3
onNext() --> 5
onNext() --> 7
onNext() --> 9
onNext() --> 10
onCompleted()
  • distinct
Observable.just(1, 2, 3, 4, 2, 3)
        .distinct()
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()

3.4 Combining 操作符(组合)

zip
merge
startWith
combineLatest
join
switchOnNext
  • zip 用来合并两个 Observable 发射的数据项,根据 Func2() 函数指定的规则生成一个新的 Observable 并发射出去,当其中一个 Observable 发射数据结束或者出现异常后,另一个 Observable 也将停止发射数据
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.zip(observable1, observable2) { o1, o2 ->
    o1 + o2
}.subscribe(object : Subscriber<Int>() {
    override fun onNext(t: Int) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
})
onNext() --> 11
onNext() --> 22
onNext() --> 33
onCompleted()
  • merge 将两个 Observable 发射的数据项按照发射时间顺序合并成一个 Observable 进行发射
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.merge(observable1, observable2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 10
onNext() --> 20
onNext() --> 30
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onCompleted()
  • startWith 用于在一个 Observable 发射数据前插入一个 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.startWith(observable2)
        .subscribe(object : Subscriber<Int>() {
            override fun onNext(t: Int) {
                println("onNext() --> $t")
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 1
onNext() --> 2
onNext() --> 3
onNext() --> 4
onNext() --> 10
onNext() --> 20
onNext() --> 30
onCompleted()
  • combineLatest 用于将两个 Observable 发射的临近的数据项通过 Func2() 函数指定的规则组合成一个新的 Observable
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
Observable.combineLatest(observable1, observable2) { o1, o2 ->
    o1 + o2
}.subscribe(object : Subscriber<Int>() {
    override fun onNext(t: Int) {
        println("onNext() --> $t")
    }
    override fun onCompleted() {
        println("onCompleted()")
    }
    override fun onError(e: Throwable?) {
        println("onError()")
    }
onNext() --> 31
onNext() --> 32
onNext() --> 33
onNext() --> 34
onCompleted()
  • join
val observable1 = Observable.just(10, 20, 30)
val observable2 = Observable.just(1, 2, 3, 4)
observable1.join(observable2,
        { Observable.timer(2, TimeUnit.SECONDS) },
        { Observable.timer(0, TimeUnit.SECONDS) },
        { t1, t2 -> Observable.just(t1 + t2) })
        .subscribe(object : Subscriber<Observable<Int>>() {
            override fun onNext(t: Observable<Int>) {
                t.subscribe(object : Subscriber<Int>() {
                    override fun onNext(data: Int) {
                        println("onNext() --> $data")
                    }
                    override fun onCompleted() {
                        println("onCompleted()")
                    }
                    override fun onError(e: Throwable?) {
                        println("onError()")
                    }
                })
            }
            override fun onCompleted() {
                println("onCompleted()")
            }
            override fun onError(e: Throwable?) {
                println("onError()")
            }
        })
onNext() --> 11
onCompleted()
onNext() --> 21
onCompleted()
onNext() --> 31
onCompleted()
onNext() --> 12
onCompleted()
onNext() --> 22
onCompleted()
onNext() --> 32
onCompleted()
onNext() --> 13
onCompleted()
onNext() --> 23
onCompleted()
onNext() --> 33
onCompleted()
onNext() --> 14
onCompleted()
onNext() --> 24
onCompleted()
onNext() --> 34
onCompleted()
onCompleted()

4. 线程调度(结合 RxAndroid)

  1. Schedulers.io() I/O 线程,执行耗时操作
  2. AndroidSchedulers.mainThread() Android 中的UI线程,执行UI更新
  3. subscribeOn() 调度被观察者运行的线程
  4. observeOn() 调度观察者运行的线程
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容