前言
应该有小伙伴和我一样吧,在学习了南尘大大的'这可能是最好的RxJava 2.x 入门教程系列专栏'后,虽然对RxJava的使用都会有一定程度认识了,但是南尘大大的完结篇怎么打不开啊o(╥﹏╥)o,而且南尘大大在介绍操作符的时候都是简单介绍加代码实现,对此我还是存在一些疑惑和不理解的,加上如果不是常用的话,这么多操作符还是很容易忘记的,所以就来写一篇总结吧!
这里给出南尘大神的文章链接:
这可能是最好的 RxJava 2.x 入门教程(一)
这可能是最好的 RxJava 2.x 入门教程(二)
这可能是最好的 RxJava 2.x 入门教程(三)
这可能是最好的 RxJava 2.x 入门教程(四)
这可能是最好的 RxJava 2.x 入门教程(五)
抛物线大神的文章链接:
给 Android 开发者的 RxJava 详解
正文
RxJava 到底是什么
GitHub 主页上的介绍是这样的"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
通俗点说就是一个可以帮你解决异步迷之缩进烦恼的一个程序库,它和你之前学习的基本操作符是一样的,只是它比较高级一点。
RxJava 好在哪
一个词:简洁
一段话:在我们编程的时候保持我们的逻辑思路的简洁,在维护时帮助我们更容易理解。
好了好了,不要在用鄙视的眼神看我(*❦ω❦) ,我说了这是总结!总结!!下面我们直接上代码。
Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
/* 业务处理... */
}
});
这是RxJava最常见的使用或者说是结构,分为被观察者(发射器)、操作符(N个)、发射调度方、接收调度方、观察者(接收器)。
Observable(发射器)相关
-
Obesevable
没什么好说的,最常用于创建被观察者的类。 -
Flowable
该类与Obeservable
的区别是它支持背压,使用方式基本相同。背压是指在异步场景中,被观察者发射事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发射速度的策略。 -
Single/Completable/Maybe
其实这三者都差不多,Single
顾名思义,只能发射一个事件,和Observable
接受可变参数完全不同。而Completable
侧重于观察结果,而Maybe
是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。
Observer(接收器)相关
-
Observer
处理onSubscribe()
、onNext()
、onError()
、onComplete()
四个方法的回调实现。-
onSubscribe()
:返回接收器的控制器(Disposable
),控制事件的接收。 -
onNext()
:接收事件并处理。 -
onCompleted()
: 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext()
发出时,需要触发onCompleted()
方法作为标志。 -
onError()
: 事件队列异常。在事件处理过程中出异常时,onError()
会被触发,同时队列自动终止,不允许再有事件发出。 - 在一个正确运行的事件序列中,
onCompleted()
和onError()
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()
和onError()
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
-
线程调度相关
线程控制——Scheduler
(调度器):在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe()
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler
(调度器)。
-
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
。 -
Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。 -
Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 -
Schedulers.computation()
: 计算所使用的Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()
中,否则 I/O 操作的等待时间会浪费 CPU。 - 另外, Android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。
操作符相关
创建Observables
创建一个新的源Observable
。
-
Create
— 操作符应该是最常见的操作符了,主要用于产生一个Obserable
被观察者对象。 -
Defer
— 每次订阅都会创建一个新的Observable
,并且如果没有被订阅,就不会产生新的Observable
。 -
Empty
/Never
/Throw
—Empty
是创建一个不发射且可终止item
的Observable
;Never
是创建一个不发射且不终止item
的Observable
;Throw
是创建一个不发射且已错误终止的item
的Observable
。 -
From
— 从数组或者序列化集合中获取数据创建Observable
。 -
Interval
— 创建一个Observable
用于间隔时间执行某个操作,其接受三个参数,分别是第一次发射延迟,间隔时间,时间单位,默认由新线程调度发射。 -
Just
— 从对象数组中获取并创建Observable
。 -
Range
— 创建一个发射特定范围的连续整数的item
的Observable
。 -
Repeat
— 创建一个可以多次发射特定item
的Observable
。 -
Timer
— 创建一个延时的Observable
,并只发出一次item
,默认由新线程调度发射。
变换Observables
变换从源Observable
已发出的item
。
-
Buffer
— 将源Observable
中的数据按skip
(步长) 分成最大不超过count
的迭代器集合,然后生成一个Observable
,发出的迭代器集合个数 =items
的个数 -skip
(步长)。 -
FlatMap
— 把一个源Observable
通过某种方法转换为多个Observable
,然后再把这些分散的Observable
结合到同一个Observable
,无法保证发射顺序。需要顺序发射可使用concatMap
操作符。 -
GroupBy
— 对源Observable
的item
进行分组,得到一个分组的Observable
(结果集)进行发射,可通过getKey()
方法获取结果集的Key
值。注意,但你不对结果集处理时(既不订阅执行也不对其进行别的操作符运算),应使用take(0)
操作符,以防止内存泄漏。 -
Map
— 将已发射的item
根据函数变化。 -
Scan
— 通过函数方法进行两两递归处理每一个源Observable
的item
,得到最后的item
发射。该操作符与reduce
作用类似,但是它不仅关注结果,也关注过程。 -
Window
— Window与Buffer
相似 ,但不是从源Observable
发射items
,而是发射Observable
,每个Observable
都是从源Observable
发出一个子item
,直到收到onCompleted
通知终止。
过滤Observables
从源Observable
选择性的发射item
。
-
Debounce
— 去除发射频率过快的item
。 -
Distinct
— 去掉重复的item
。 -
ElementAt
— 仅发射指定索引的item
。 -
Filter
— 常用的过滤操作符,通过test()
函数过滤。 -
First
— 只发射第一个item
或符合条件的第一个item
。 -
IgnoreElements
— 忽略所有源Observable
发射的item
,只接收onCompleted
和onError
事件。 -
Last
— 只发射最后一个item
或者符合条件的最后一个item
。 -
Sample
— 在指定的时间间隔范围内对源Observable
发射item
的最后一个进行周期性采样。 -
Skip
— 跳过count
个数目的item
。 -
SkipLast
— 跳过倒序count
个数目的item
。 -
Take
— 最多接收count
个数目的item
。 -
TakeLast
— 最多接收倒序count
个数目的item
。
结合Observables
结合多个源Observable
去创建一个Observable
。
-
And
/Then
/When
— 作用和zip()
类似,使用Pattern
和Plan
作为中介,将发射的数据集合并到一起,来完成一些复杂的结构。 -
CombineLatest
— 将两个源Observable
中的其中的一个发射的item
与另一个源Observable
中最后发射的item
以一定的规则进行合并。 -
Join
— 该操作符与CombineLatest
的作用类似,不同之处在于它可以控制每个Observable
发射的生命周期。 -
Merge
— 结合多个源Observable
,不需要等待其中发射器的发射完毕就可以发射其他的发射器的item
,可接受可变参数,也支持迭代器集合。 -
StartWith
— 在源Observable
发射item
前,插入指定的一些数据。 -
Switch
— 能够从一个Observable
自动取消订阅来订阅一个新的Observable
(停止旧的Observable
并订阅新的Observable
)。 -
Zip
— 合并多个源Observable
,最终合并的item
数目与源Obeservable
中item
数目最少的相同。
处理错误的操作符
处理错误通知,让Observable
能够正常终止。
-
Catch
— 类似于java
中的try/catch
,拦截onError
的调用,让Observable
不会因为错误的产生而终止。 -
Retry
— 当源Observable
遇到错误,重新订阅它并期望它能正常终止。需指定最多重新订阅的次数。注意由于重新订阅,可能会造成数据项重复。
公用操作符
一些辅助处理的工具操作符。
-
Delay
— 延迟一段指定的时间再发射item
,注意当接收到onError
通知时,会立即终止,如需延迟订阅源Observable
,需使用delaySubscription
操作符。 -
Do
— 在Observable
的生命周期期间执行一些操作。(常用的有doOnNext
、doOnComplete
、doOnError
) -
Materialize
/Dematerialize
—materialize
将来自源Observable
的通知(onNext/onError/onComplete
)都转换为一个Notification
对象,然后再按原来的顺序一次发射出去。dematerialize
则相反。 -
ObserveOn
— 指定Observable
自身在哪个调度器上执行(即在那个线程上运行)。 -
Serialize
— 强制Observable
按次序发射数据并且要求功能是完好的。 -
Subscribe
— 订阅,使Obervable
与Observer
联合起来。 -
SubscribeOn
— 指定Observable
在哪个调度器上发送通知给观察者(调用观察者的onNext
,onCompleted
,onError
方法)。 -
TimeInterval
— 拦截源Observable
发射的item
,转换为两次发射的时间间隔的TimeInterval
对象。 -
Timeout
— 如果原始Observable
过了指定的一段时长没有发射任何item
,Timeout
操作符会以一个onError
通知终止这个Observable
,或者继续一个备用的Observable
。 -
Timestamp
— 将源Observable
发射的item
转换为一个包含发射时间的Timestamped
对象。 -
Using
— 指示Observable
创建一个只在它的生命周期内存在的资源,当Observable
终止时这个资源会被自动释放。
条件和布尔操作符
检测Observable发射的item的操作符。
-
All
— 判定源Observable
发射的所有item
是否都满足某个条件,不满足则抛弃。 -
Amb
— 传入多个源Observable
时,只会发射其中先发射item
或通知(onError
或onCompleted
)的Observable
的所有数据。 -
Contains
— 判断源Observable
发射的所有item
中,是否包含Contains
操作符的传入值,不存在的抛弃。 -
DefaultIfEmpty
— 当源Observable
没有发射任何item
时,使用defaultIfEmpty
操作符会为你发射一个你提供默认值。 -
SequenceEqual
— 判定两个源Observables
是否发射相同的数据序列。(相同的数据,相同的顺序,相同的终止状态) -
SkipUntil
— 忽略源Observable
发射的item
直到指定的Observable
开始发射的那一刻,源Observable
才开始正常发射。 -
SkipWhile
— 忽略源Observable
发射的item
直到指定的某个条件变为false
的那一刻,源Observable
才开始正常发射。 -
TakeUntil
— 源Observable
正常发射的item
直到指定的Observable
开始发射或者发射终止通知(onError/onComplete
)的那一刻,源Observable
才停止发射并终止。 -
TakeWhile
— 源Observable
正常发射的item
直到指定的某个条件变为false
的那一刻,源Observable
才停止发射并终止。
数学运算及聚合操作符
对整个序列进行操作的操作符。
-
Average
— 计算源Observable
中所有item
的平均值并发射。 -
Concat
— 从多个源Observable
中发射,排序接收。 -
Count
— 计算源Observable
中所有item
的个数并发射。 -
Max
— 计算源Observable
中所有item
的最大值并发射。 -
Min
— 计算源Observable
中所有item
的最小值并发射。 -
Reduce
— 通过函数方法处理每一个源Observable
的item
,得到最后的item
发射。该操作符与scan
作用类似,但是它只关注结果。 -
Sum
— 计算源Observable
中所有item
的和并发射。
背压操作符
-
backpressure operators
— 应对源Observable
发射item
的速率太快,导致Observer
无法及时消费而产生异常的策略。 -
onBackpressureBuffer
— 若源Observable
发射item
的速率太快,导致Observer
无法及时消费时,缓存所有当前无法消费的item
,直到Observer
可以处理为止。 -
onBackpressureDrop
— 若源Observable
发射item
的速率太快,导致Observer
无法及时消费时,则将当前item
抛弃。
可连接的Observable操作符
可动态控制发射的Observable操作符。
-
Connect
— 指示一个ConnectableObservable
开始发射数据。ConnectableObservable
在被订阅时并不开始发射数据,只有在它的connect()
被调用时才开始。 -
Publish
— 将普通的Observable
转换为ConnectableObservable
,ConnectableObservable
是Observable
的子类。 -
RefCount
— 将普通的ConnectableObservable
转换为Observable
。 -
Replay
— 缓存Observable
订阅之前已经发射的数据,这样即使有Observable
在其发射数据开始之后进行订阅也能收到之前发射过的数据。Replay
操作符能指定缓存的大小或者时间,这样能避免耗费太多内存。
转换操作符
-
To
— 将源Observable
转换为另一个对象或数据结构(迭代器集合),如果原Observable
发射完他的数据需要一段时间,使用To
操作符得到的Observable
将阻塞等待原Observable
发射完后再将数据序列打包后发射出去。
好了,终于完工了,写这篇文章的初衷只是为了自己方便记忆,希望能帮助到各位小伙伴~~ 撒花~!✿✿ヽ(°▽°)ノ✿✿