RxJava 辅助操作符

ReactiveX 系列文章目录


using

创建一个资源用于发射,当取消订阅时也释放资源。

public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier, Consumer<? super D> disposer, boolean eager)

第一个参数就是提供发射的资源,最后一个参数 eager,默认为 true。

lateinit var disposable : Disposable
// 第一个参数提供了一个集合资源
Observable.using({ listOf(1,2,3,4,5,6,7,8) }, { list ->
    Observable.create(ObservableOnSubscribe<Int> {
        for (i in list) {
            it.onNext(i)
            if (i == 7)
                it.onComplete()
        }
    }) // 第二个参数是 Observable,发射资源
}, { Log.e("RX", "dispose $it,在这里释放资源")}) // 第三个参数用于最后释放第一个参数的资源
        .subscribe(object: Observer<Int> {
            override fun onComplete() { Log.e("RX", "onComplete") }

            override fun onSubscribe(d: Disposable) { disposable = d}

            override fun onNext(t: Int) {
                Log.e("RX", "onNext $t")
                if (t == 5) disposable?.dispose()
            }

            override fun onError(e: Throwable) { Log.e("RX", "onError") }
        })

日志:

onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源

上例中,Observer 在收到 5 这个整数时,就 dispose(),最后进了 using 第三个参数设置的 disposer 释放资源。

去掉调用 dispose(),这样发到 7 的时候,发射 onComplete,此时日志是

onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
onComplete

给 using 方法加最后一个参数 false,日志如下,相比为 true 的就是等 onComplete 之后才去释放资源。

onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
onComplete
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源

delay

public final Observable<T> delay(long delay, TimeUnit unit)
public final Observable<T> delay(long delay, TimeUnit unit, boolean delayError)
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)


public final <U> Observable<T> delay(final Function<? super T, ? extends ObservableSource<U>> itemDelay)
// 先调用 delaySubscription,然后调用上面的 delay 方法
public final <U, V> Observable<T> delay(ObservableSource<U> subscriptionDelay,
       Function<? super T, ? extends ObservableSource<V>> itemDelay) {
   return delaySubscription(subscriptionDelay).delay(itemDelay);
}

会延迟 onNext 和 onComplete,不会延迟 onError。就是整个发射事件被延迟了。

Observable.create(ObservableOnSubscribe<Int> {
   it.onNext(1)
   it.onNext(2)
   it.onError(Throwable())
   it.onNext(5)
})
.delay(5, TimeUnit.SECONDS, true)
.subscribe (object: Observer<Int> {
   override fun onComplete() {
   }

   override fun onSubscribe(d: Disposable) {
   }

   override fun onNext(t: Int) { Log.e("RX", "$t")
   }

   override fun onError(e: Throwable) { Log.e("RX", "error")
   }

})

会先收到 1,2 然后收到 error,如果 delay 的第三个参数为 false,直接收到 error。

Observable.just(1, 2, 3)
    .delay(5, TimeUnit.SECONDS)
    .subscribe { Log.e("RX", "$it") }

Function 方法返回的 Observable,onNext 或 onComplete 后外面的 Observable 才发射。

Log.e("RX", "start")
Observable.just(10).delay {
    Thread.sleep(3000)
    Observable.just("a")
}.subscribe( { Log.e("RX", "收到") } )

delaySubscription

public final <U> Observable<T> delaySubscription(ObservableSource<U> other)
// 先用 timer 封装一下,调用上面的方法
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
    return delaySubscription(timer(delay, unit, scheduler));
}
// 调用上面三个参数的方法
public final Observable<T> delaySubscription(long delay, TimeUnit unit)

delay 是发射延迟,delaySubscription 发射正常,是观察者订阅延迟。

Observable.just(1, 2, 3)
       .delaySubscription(5, TimeUnit.SECONDS)
       .subscribe {
           Log.e("RX", "$it")
       }

第一个构造方法含义和 delay 那个类似,也是 Function 里的方法返回的那个 Observable 发射了,外面的订阅才开始。

materialize/dematerialize

materialize 将原来的 onNext,onError,onComplete 全都变成一个通知 Notification(RX 框架里的类),然后都通过 onNext 发射出去。

val materializeObservable = Observable.just(1,2,3)
                    .materialize()

materializeObservable.subscribe( {
    Log.e("RX", "value=${it.value},complete=${it.isOnComplete}")
})

日志:

value=1,complete=false
value=2,complete=false
value=3,complete=false
value=null,complete=true

dematerialize 是 materialize 的逆过程,将 onNext 发射的 Notification 变成原来的样子。

materializeObservable.dematerialize<Int>()
.subscribe(object : Observer<Int> {
   override fun onComplete() { Log.e("RX", "onComplete")}
   override fun onSubscribe(d: Disposable) {}
   override fun onNext(t: Int) { Log.e("RX", "onNext=$t") }
   override fun onError(e: Throwable) {}
})
onNext=1
onNext=2
onNext=3
onComplete

doXXX

包括:

  • doOnSubscribe
  • doOnNext
  • doAfterNext
  • doOnTerminate
  • doAfterTerminate
  • doOnError
  • doOnComplete
  • doOnDispose
  • doFinally
  • doOnLifecycle
  • doOnEach
lateinit var disposable: Disposable

// 封装了 doXXX 方法
val transformer = ObservableTransformer<Int, Int> {
    it.doOnNext { Log.e("RX", "doOnNext $it") } // Observable 调 onNext 后执行
            .doAfterNext { Log.e("RX", "doAfterNext $it") } // Observer 接收到 onNext 后执行
            .doOnComplete { Log.e("RX", "doOnComplete") } // Observable 调 onComplete 后执行
            .doOnError { Log.e("RX", "doOnError") } // Observable 调 doOnError 后执行
            .doOnTerminate { Log.e("RX", "doOnTerminate") } //  Observable 调 onComplete 或 onError 后执行
            .doAfterTerminate { Log.e("RX", "doAfterTerminate") } // Observer 接收到 onComplete 或 onError 后执行
            .doOnDispose { Log.e("RX", "doOnDispose") } // Observer 调用 dispose() 后执行
            .doFinally { Log.e("RX", "doFinally") } // Observable 调 onComplete 或 onError 后,或者 Observer 调用 dispose()
            .doOnEach {
                // Observable 调用任何方法都会收到一个通知
                val str = if (it.isOnComplete) "onComplete" else (if (it.isOnError) "onError" else "${it.value}")
                Log.e("RX", "doOnEach Notification $str")
            }
            .doOnEach(object : Observer<Int> { // 调用任何方法都会用一个 Observer 来收到这个事件
                override fun onComplete() { Log.e("RX", "doOnEach Observer onComplete") }
                override fun onSubscribe(d: Disposable) { Log.e("RX", "doOnEach Observer onSubscribe") }
                override fun onNext(t: Int) { Log.e("RX", "doOnEach Observer onNext $t") }
                override fun onError(e: Throwable) { Log.e("RX", "doOnEach Observer onError") }
            })
            .doOnLifecycle(Consumer<Disposable> { Log.e("RX", "doOnLifecycle onSubscribe") }, // Observable 被订阅后执行
                    Action { Log.e("RX", "doOnLifecycle onDispose") }) // Observable 被取消订阅后执行
            .doOnSubscribe { Log.e("RX", "doOnSubscribe") } // Observable 被订阅后执行
}

// 发射 onComplete
val observable1 = Observable.create(ObservableOnSubscribe<Int> {
    Log.e("RX", "before emit 1")
    it.onNext(1)
    Log.e("RX", "after emit 1, before emit 2")
    it.onNext(2)
    Log.e("RX", "after emit 2, before emit complete")
    it.onComplete()
    Log.e("RX", "after emit complete")
})

// 发射 onError
val observable2 = Observable.create(ObservableOnSubscribe<Int> {
    Log.e("RX", "before emit 1")
    it.onNext(1)
    Log.e("RX", "after emit 1, before emit 2")
    it.onNext(2)
    Log.e("RX", "after emit 2, before emit error")
    it.onError(Throwable())
    Log.e("RX", "after emit error")
})

val observer1 = object : Observer<Int> { // 正常的一个 Observer
    override fun onComplete() { Log.e("RX", "Observer onComplete") }
    override fun onSubscribe(d: Disposable) { Log.e("RX", "Observer onSubscribe") }
    override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
    override fun onError(e: Throwable) { Log.e("RX", "Observer onError") }
}

val observer2 = object : Observer<Int> { // 取消订阅的一个 Observer
    override fun onComplete() { Log.e("RX", "Observer onComplete") }
    override fun onSubscribe(d: Disposable) {
        disposable = d
        Log.e("RX", "Observer onSubscribe")
    }
    override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
    override fun onError(e: Throwable) {
        disposable.dispose()
        Log.e("RX", "Observer onError and dispose")
    }
}

observable1.compose(transformer).subscribe(observer1)

日志

05-16 11:29:42.568 18180-18180/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
    doOnSubscribe
    Observer onSubscribe
    before emit 1
05-16 11:29:42.569 18180-18180/pot.ner347.androiddemo E/RX: doOnNext 1
05-16 11:29:42.570 18180-18180/pot.ner347.androiddemo E/RX: doOnEach Notification 1
    doOnEach Observer onNext 1
    Observer onNext 1
    doAfterNext 1
    after emit 1, before emit 2
    doOnNext 2
    doOnEach Notification 2
    doOnEach Observer onNext 2
    Observer onNext 2
05-16 11:29:42.571 18180-18180/pot.ner347.androiddemo E/RX: doAfterNext 2
    after emit 2, before emit complete
    doOnComplete
    doOnTerminate
    doOnEach Notification onComplete
    doOnEach Observer onComplete
    Observer onComplete
    doFinally
    doAfterTerminate
    after emit complete

observable2.compose(transformer).subscribe(observer1)

日志

05-16 11:32:23.905 19052-19052/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
    doOnSubscribe
    Observer onSubscribe
    before emit 1
    doOnNext 1
05-16 11:32:23.907 19052-19052/pot.ner347.androiddemo E/RX: doOnEach Notification 1
    doOnEach Observer onNext 1
    Observer onNext 1
    doAfterNext 1
    after emit 1, before emit 2
    doOnNext 2
    doOnEach Notification 2
    doOnEach Observer onNext 2
    Observer onNext 2
    doAfterNext 2
    after emit 2, before emit error
    doOnError
05-16 11:32:23.908 19052-19052/pot.ner347.androiddemo E/RX: doOnTerminate
    doOnEach Notification onError
    doOnEach Observer onError
    Observer onError
    doFinally
    doAfterTerminate
    after emit error

observable2.compose(transformer).subscribe(observer2)

日志

05-16 11:42:35.507 20216-20216/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
    doOnSubscribe
05-16 11:42:35.508 20216-20216/pot.ner347.androiddemo E/RX: Observer onSubscribe
    before emit 1
    doOnNext 1
05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification 1
    doOnEach Observer onNext 1
    Observer onNext 1
    doAfterNext 1
    after emit 1, before emit 2
    doOnNext 2
    doOnEach Notification 2
05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Observer onNext 2
    Observer onNext 2
    doAfterNext 2
    after emit 2, before emit error
    doOnError
    doOnTerminate
05-16 11:42:35.510 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification onError
    doOnEach Observer onError
    doOnLifecycle onDispose
    doOnDispose
    doFinally
    Observer onError and dispose
    doAfterTerminate
    after emit error

onTerminateDetach

在执行 dispose() 解除订阅时,将内部对外部观察者的引用 actual 置为 null,看网上文章主要用于防止内存泄漏问题,因为 RxJava 使用中用了许多匿名内部类。比如这篇文章:一张图搞定-RxJava2的线程切换原理和内存泄露问题

serialize

Observable 可以异步调用观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。

使用 serialize 可以纠正 Observable 的行为,保证它的行为是正确的且是同步的。

subscribe/subscribeWith

订阅,主要是有几个重载方法。

// 用的是一些默认的实现
public final Disposable subscribe() {
    return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe)
public final void subscribe(Observer<? super T> observer)

// 订阅后返回这个观察者对象
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

timeInterval

public final Observable<Timed<T>> timeInterval() {
    return timeInterval(TimeUnit.MILLISECONDS, Schedulers.computation());
}
public final Observable<Timed<T>> timeInterval(Scheduler scheduler) {
    return timeInterval(TimeUnit.MILLISECONDS, scheduler);
}
public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
    return timeInterval(unit, Schedulers.computation());
}

public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)

拦截源 Observable 发射的数据项,算出两个连续发射数据之间的时间间隔,将这个间隔和原始数据封装成 Timed 发射出来。

新的 Observable 的第一个发射数据是在 Observer 订阅源Observable 到源 Observable 发射第一项数据之间的时间长度。源 Observable 发射最后一项数据到发射 onComplete 之间的时间间隔不会发射。

Observable.create(ObservableOnSubscribe<Int> {
    it.onNext(1)
    Thread.sleep(100)
    it.onNext(2)
    Thread.sleep(200)
    it.onNext(3)
    Thread.sleep(150)
    it.onNext(4)
    Thread.sleep(250)
    it.onComplete()
}).timeInterval().subscribe(object: Observer<Timed<Int>> {
    override fun onComplete() { Log.e("RX", "onComplete") }

    override fun onSubscribe(d: Disposable) {}

    override fun onNext(t: Timed<Int>) {
        t.time()
        Log.e("RX", "onNext ${t.time()},${t.value()}")
    }

    override fun onError(e: Throwable) {}
})

日志

onNext 0,1
onNext 101,2
onNext 200,3
onNext 151,4
onComplete

timestamp

timeInterval 是将时间间隔和源数据封装,而 timestamp 是将发射时的时间戳和源数据封装。

Observable.create(ObservableOnSubscribe<Int> {
    it.onNext(1)
    Thread.sleep(100)
    it.onNext(2)
    Thread.sleep(200)
    it.onNext(3)
    Thread.sleep(150)
    it.onNext(4)
    Thread.sleep(250)
    it.onComplete()
}).timestamp().subscribe(object: Observer<Timed<Int>> {
    override fun onComplete() { Log.e("RX", "onComplete") }

    override fun onSubscribe(d: Disposable) {}

    override fun onNext(t: Timed<Int>) {
        t.time()
        Log.e("RX", "onNext ${t.time()},${t.value()}")
    }

    override fun onError(e: Throwable) {}

})

日志

onNext 1526714465003,1
onNext 1526714465104,2
onNext 1526714465305,3
onNext 1526714465457,4
onComplete

timeout

// Function 里返回的 Observable 结束之前,源 Observable 还没发射数据的话就超时
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)

// 超时进 onError
public final Observable<T> timeout(long timeout, TimeUnit timeUnit) 
// 超时使用备用的 Observable 发射
// 如果没超时,先发源 Observable,再发备用的 Observable
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other) 
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource<? extends T> other) 
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler)

public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)

public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)
val ob = Observable.interval(10, 10, TimeUnit.MILLISECONDS).take(3)
val observer = object : Observer<Long> {
    override fun onComplete() { Log.e("RX", "onComplete") }

    override fun onSubscribe(d: Disposable) {
    }

    override fun onNext(t: Long) {Log.e("RX", "onNext $t") }

    override fun onError(e: Throwable) {Log.e("RX", "onError") }
}
val other = Observable.just(20L,30L)

// 超时
// onError
ob.timeout(5, TimeUnit.MILLISECONDS).subscribe(observer)

// 超时
// onNext 20
// onNext 30
// onComplete
ob.timeout(5, TimeUnit.MILLISECONDS, other).subscribe(observer)

// Function 返回的 Observable 在 2ms 后发个 0 就结束了,此时源 Observable 还没发射,于是超时,但是第一个数据发出来了
// onNext 0
// onError
ob.timeout({Observable.timer(2, TimeUnit.MILLISECONDS)}).subscribe(observer)

// 控制第一个数据也有超时限制
// onError
ob.timeout(Observable.timer(5, TimeUnit.MILLISECONDS),
                    Function<Long, ObservableSource<Long>>{Observable.timer(5, TimeUnit.MILLISECONDS)})
                    .subscribe(observer)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容

  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,664评论 8 93
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,860评论 0 10
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,826评论 0 1
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 932评论 0 3
  • 正式上班第二天,想说的太多了,没逻辑了,哈哈,首先,新年快乐,额,不喜欢在城市过年,没啥意思,起初,玩得...
    1大太阳阅读 313评论 0 1