简单示例
1.创建被观察者
val observable = Observable.create<String> {
//被订阅时, 自动执行
it.onNext("连载1")
it.onNext("连载2")
it.onNext("连载3")
it.onComplete()
}
2. 创建观察者
val observer: Observer<String> = object : Observer<String> {
var disposable: Disposable? = null
val tag = "#############" + this.javaClass.name
override fun onSubscribe(d: Disposable) {
disposable = d
Log.i(tag, "onSubscribe")
}
override fun onError(e: Throwable) {
Log.i(tag, e.message + "")
}
override fun onComplete() {
Log.i(tag, "onComplete")
}
override fun onNext(t: String) {
if (t == "2") {
disposable?.dispose()
return
}
Log.i(tag, t + "")
}
}
3. 被观察者 添加 观察者
observable.subscribe(observer)
异步链式调用
Observable.create<Int> {
it.onNext(1)
Thread.sleep(1000)
it.onNext(2)
Thread.sleep(1000)
it.onNext(3)
it.onComplete()
}
.observeOn(AndroidSchedulers.mainThread())//观察者回调在主线程
.subscribeOn(Schedulers.io())//被观察者在子线程执行
/*
*subscribe()方法
* 可接收 Observer接口
* 或者 Observer接口的onNext()方法
* 或者 不关心任何事件,则不传参数
*/
.subscribe {
Log.i("########${this.javaClass.name}", "$it 秒")
}
扩展方法
private var disposable: Disposable? = null
@SuppressLint("CheckResult")
private fun subscribe() {
//just 连续发送最多10个数据
disposable = Observable.just("just1", "just2", "just3")
.subscribe {
Log.i("#######", it)
}
Observable.fromArray(listOf("from1", "from2", "from3"))
.subscribe { list ->
list.forEach {
Log.i("#######", it)
}
}
}
override fun onDestroy() {
super.onDestroy()
disposable?.dispose()
}
subscribe()最终返回一个Disposable,我们可已在Activity结束时取消订阅.
我们也可以对所有的订阅进行集中处理
val compositeDisposable: CompositeDisposable = CompositeDisposable()
private fun compositeDispose() {
object : Observer<String> {
override fun onComplete() = Unit
override fun onNext(t: String) = Unit
override fun onError(e: Throwable) = Unit
override fun onSubscribe(d: Disposable) {
compositeDisposable.add(d)
}
}
}
override fun onDestroy() {
super.onDestroy()
compositeDisposable.clear()
}