首先介绍RxJava之前,先了解一些基础知识
1.RxJava的Rx是什么?什么是RxJava?
RxJava
的全称是ReactiveXJava
,RxJava
在 GitHub 主页上的自我介绍是
"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。
那么,什么是ReactiveX
?
reactiveX:是一个跨语言的标准、规范
实现的语言:scala、kotlin、groovy、js、android、java等
2.观察者模式是什么?
详情请看博客JAVA设计模式之观察者模式
3.为什么使用RxJava?
原因如下:
- 简洁
- 层次/逻辑结构清晰
来张图对比一下
乍一看,好像左边代码更多,仔细看的话左边是“链式结构”,右边是“缩进结构”
左边的逻辑结构很清晰,右边的缩进已经很多了如果再复杂点,缩进就会让人看的头疼
4.RxJava基本概念了解
- Observable :可观察者,即被观察者
- Observer:观察者
- subscribe:订阅
- 事件
联系:Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
4.1创建 Observer
4.2创建Observable
这里只是其中一种,后面再介绍操作符的时候会更多,大概就是这样的结构
4.3 Subscribe
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了
5. RxJava的线程控制Scheduler
概念解析:
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器),相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。
变换
概要
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
变换的原理:lift()
实质上都是针对事件序列的处理和再发送
而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同
如下图所示
更多详情参见大神解析 扔物线
7.RxJava一些常用操作符,没有运行结果图,需要自己去实现查看
1. Observable.fromArray(T ... item)
fromArray可以接受可变长参数对象(其实就是一个数组),把数组中的每个元素发射出来
2. Observable.fromIterable(Iterable<? extends T> source)
此方法接收一个继承自Iterable接口的参数,简单的说就是java中的集合类。因此你可以传入一个list集合等等
3. Observable.interval(long initialDelay, long period, TimeUnit unit)
此方法返回一个每隔指定的时间间隔就发射一个序号的 Observable 对象,可用来做倒计时心跳包等操作,无限发送,除非调用dispose()可以终止。
4. Observable.timer(long delay, TimeUnit unit)
创建一个在指定延迟时间后发射一条数据( 固定值:0 )的 Observable 对象,可用来做定时操作。
5. Observable.range(final int start, final int count)
此方法可以发射一个指定范围的数
6. Map
map 操作符是可以将返回的数据变换成别的数据类新,比如你可以使用 map 操作符将原来要发射的字符串数据变换成数值型在发射出去。
7. flatMap
不同于map的是flatMap 返回的是一个全新的Observable 对象。
8. filter
这个操作符可以作为数据筛选器
下面的例子当数据>3才能放过,因此输出4 5
9. take
此操作符用于指定想要的数据数量,下面的例子最终只输出1和2
10. distinct
简单的说就是去除重复的值
1,2,3,3,4,5 -> 1,2,3,4,5
11. Concat
对于单一的把两个发射器连接成一个发射器
12. Buffer
buffer 操作符接受两个参数,buffer(count,skip),作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。
13. Timer
timer 很有意思,相当于一个定时任务, 默认在新线程
14. Interval
interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位。
Timer 对比 interval
15. Skip
接受一个 long 型参数 count ,代表跳过 count 个数目开始接收。
16. Just
一个简单的发射器依次调用 onNext() 方法。
17. Single
Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()。
18. Debounce
去除发送频率过快的项
去除发送间隔时间小于 500 毫秒的发射事件,所以 1 和 3 被去掉了
19. Defer
简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable。
20. Last
last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
若last(3)则取出3,若last()则取出最后一个4
21. Merge
merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。
输出1,2,3,6
22. Reduce
reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。
23. Scan
scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 只追求结果,而 scan 会始终如一地把每一个步骤都输出