给Android开发者的RxJava详解:
- 与setOnClickListener的区别是:
setOnClickListener()
订阅以后,不是马上回调,而是在逻辑处理到位了该回调了再回调。而subscribe()
是把计划表的call()
逻辑和回调逻辑提前写好,在订阅的一瞬间直接全发出去。 -
onStart()
方法是在subscribe()
里发出去的,因此只能和订阅发生在同一线程,如果要切线程的话使用doOnSubscribe()
。 - 在一个正确的事件处理顺序中应该在最后调用
onComplete()
或onError()
来标志事件序列结束。使用不完整回调其实也是可以的。 - 在不再使用时要调用
unsubscribe()
来解除订阅,因为Observable会持有Observer的引用,如果是Activity会引起内存泄漏。 - Observer是一个接口,Subscriber才是实现类,最终Observer也是转为Subscriber使用的。
- 默认是在哪个线程调用
subscribe()
,就在哪个线程处理call()
方法。但一般call()
逻辑都是耗时的应该在子线程执行然后切回主线程。 - 事件产生和消费:
• subscribeOn(): 指定subscribe()
所发生的线程,即订阅和call()
方法逻辑执行的线程,或者叫做事件产生的线程。
• observeOn(): 指定Subscriber所运行在的线程,即onNext()
回调所处的线程,或者叫做事件消费的线程。 - 变换,就是将事件序列中的对象或整个序列进行包装、加工处理,转换成不同的事件或事件序列重新发出(顺序是不变的)。
-
flatMap()
中使用传入的事件对象创建一个Observable对象,并不发送这个Observable,而是将它激活,于是它开始发送事件。每一个创建出来的Observable发送的事件,都被汇入原始的总Observable,它负责将这些事件统一交给Subscriber的回调方法。
这就可以做嵌套异步逻辑,最原始的异步逻辑是依赖于很多其他异步逻辑的,可以等其他的都执行完了自己再一并发送最终的回调。 - 各种变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。
- 变换的原理:
• 多种变换中,最下层的Subscriber会有一个最终类型,然后往上订阅上层Observable,上层Observable知道下层Subscriber的类型但不知道调用顺序;
• 因此上层Observable会再生成一个新的Subscriber,并在该Subscriber回调中发送下层Subscriber的回调,这个新的Subscriber会发给上层用于订阅上层的顺序;
• 直到订阅到最上层的Observable时,Subscriber已经是最初的类型了,完了最上层的Observable是知道发送顺序的,开始往下发送事件;
• 每一层收到事件时,将收到的类型通过本层func.call()
转为下层需要的类型,然后通过下层的Subscriber再发送出去,最终回调到最底层的Subscriber,类型也是最终的类型了。 - 如果需要对很多Observable做一组相同的
lift()
变换时,可以将这组变换业务封装到Observable.Transformer的call()
方法里,然后给其余Observable.compose(liftAll)
使用。 - 线程控制:
•observeOn()
可以多次调用,每次调用后影响它之后的map()
处理逻辑,因为事件消费是往下发的,每次切线程后,后面的onNext()
线程就被改了;
•subscribeOn()
其实也能多次调用,只不过最终以第一次的为准,因为事件产生是往上订阅的,最终最上面的那次调用会决定最终的订阅call()
所发生的线程;
• 由于onStart()
是在subscribe()
方法里调用的,因此它改不了线程了只能跟subscribe()
在同一线程;
• 而doOnSubscribe()
是在Observable里调用的,因为事件产生是往上订阅的,因此它的call()
发生的线程处于离它最近的下面的subscribeOn()
指定的线程。
• 还有一个doOnNext()
方法会在onNext()
发送之前调用,在此先不要observeOn()
,执行完一些预备逻辑后再observeOn()
,用于发事件前处理一些前置逻辑(就是名字顾名思义点而已)。
创建Observable:
- 完整创建:
• create():完整创建1个被观察者对象。 - 快速创建:
• just():快速创建1个被观察者对象,最多10个事件。
• fromArray():快速创建1个被观察者对象,数组,大于10个事件。
• fromIterable():快速创建1个被观察者对象,list,大于10个事件。
• empty():仅发送Complete事件,直接通知完成 。
• error():仅发送Error事件,直接通知异常 。
• never():不发送任何事件。 - 延迟创建:
• defer():直到有观察者订阅时,才动态创建被观察者对象和发送事件。
• timer():延迟指定时间后,发送1个数值0,Long类型。
• interval():每隔指定时间就发送事件,从0开始、无限递增1的整数序列。
• intervalRange():每隔指定时间就发送事件,可指定发送的数据的数量,从0开始、无限递增1的整数序列。
• range():连续发送1个事件序列,可指定范围从0开始,无限递增1的整数序列,无延迟发送事件。
• rangeLong():类似于range(),区别在于该方法支持数据类型为Long。
变换操作符:
- 常用变换符:
• Map():对被观察者发送的每个事件都通过指定的函数处理,从而变换成另外一种事件。
• FlatMap():将被观察者发送的事件序列进行拆分和单独转换,再合并成一个新的事件序列,整个一起发出去,事件序列可能会乱。
• ConcatMap():与FlatMap()的区别在于重新合并生成的事件序列与旧的序列顺序是一致的,不会乱序。
• Buffer():定期从被观察者需要发送的总事件序列中获取一定数量的事件放到缓存区中,然后发送缓冲区的数据,直到遍历完为止。
组合/合并操作符:
- 组合多个被观察者:
• concat()/concatArray():组合多个Observable一起发送数据,合并后按发送顺序串行执行(一个一个发),concat()组合被观察者数量<=4个,concatArray()可>4个。
• merge()/mergeArray():组合多个Observable一起发送数据,合并后按时间线并行执行(两个两个发),merge()组合Observable数量<=4个,mergeArray()可>4个
• concatDelayError()/ mergeDelayError():任一个Observable出错以后先缓存起来,等每个被观察者发送完事件后再回调onError()。
concat()/merge()与zip()区别:
• concat()/ merge()是每个Observable的事件都会发出来,然后可以在Subscriber的onNext()
中处理(多次被调用),完了在onComplete()
中处理最终逻辑。
• zip()可以将多个Observable发出的事件汇总起来再做一次统一的逻辑处理,然后在Subscriber的onNext()
中处理的就是最终逻辑了(一次调用)。
- 合并多个事件:
• Zip():合并多个Observable发送的事件,生成一个新的事件序列(处理新的业务逻辑)并发送,多个Observable中以数量最少的那个为准。
• combineLatest():当两个Observable中的任何一个发送了数据后,将先发送了数据的Observable的最新(最后)一个数据与另外一个Observable发送的每个数据结合,按时间合并发送。
• combineLatestDelayError():可处理onError()情况。
• reduce():把被观察者需要发送的事件聚合成1个事件并发送(阶乘)。
• collect():将被观察者Observable发送的数据事件收集到一个数据结构里再发送。
zip()与combineLatest()的区别:
• zip()按个数合并,即1对1合并。
• combineLatest()按时间合并,即在同一个时间点上的合并。
- 发送事件前追加发送事件:
• startWith()/startWithArray():在一个被观察者发送事件前,追加发送一些数据再发送,越晚追加的越先发送。
• count():只统计被观察者发送事件的数量。
几个操作符的区别:
- merge():可以合并多个Observable的输出,它们的数据可能会交错发射(concat可以保持顺序)。如果某个原始Observable出现onError,merge后的Observable就会onError。
- mergeDelayError():原始observable出现onError时,错误通知会被保留,直到所有数据发射完毕后才执行onError。
如果有多个原始observable出现了onError,这些onError通知会被合并成一个CompositeException ,保留在它的 List<Throwable> exceptions异常列表里。如果只有一个原始observable出现了onError,则会直接使用这个onError通知,而不会生成CompositeException。
有坑:
如果先mergeDelayError再指定线程的话,mergeDelayError没有起到延迟通知onError的作用,第一个observable出现错误的时候,整个合并的observable也onError了,第二个observable无法输出。但是如果改成每个observable单独subscribeOn和observeOn,然后再mergeDelayError,就正常进行了。
RxJava一些使用场景:
- Scheduler线程切换:这种场景经常会在“后台线程取数据,主线程展示”的模式中看见。
- 使用debounce做textSearch:用简单的话讲就是当N个结点发生的时间太靠近(即发生的时间差小于设定的值T),debounce就会自动过滤掉前N-1个结点。
- Retrofit结合RxJava做网络请求框架:RxJava和Retrofit如何结合来实现更简洁的代码。
- RxJava代替EventBus进行数据传递(RxBus):RxBus并不是一个库,而是一种模式,是使用了RxJava的思想来达到EventBus的数据传递效果。
- 使用combineLatest合并最近N个结点:注册的时候所有输入信息(邮箱、密码、电话号码等)合法才点亮注册按钮。
- 使用merge合并两个数据源:一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。
- 使用concat和first做缓存:依次检查memory、disk和network中是否存在数据,任何一步一旦发现数据后面的操作都不执行(取第一次的有效事件)。
- 使用timer做定时操作:2秒后输出日志“hello world”,然后结束。
- 使用interval做周期性操作:每隔2秒输出日志“helloworld”。
- 使用throttleFirst防止按钮重复点击:debounce也能达到同样的效果。
- 使用schedulePeriodically做轮询请求:Schedulers.newThread().createWorker().schedulePeriodically(new Action0())。
- RxJava进行数组/list的遍历:Observable.from(names).subscribe(new Action1<String>())。
- 解决嵌套回调(callback hell)问题:NetworkService.getToken().flatMap().subscribe()。
- 响应式的界面:比如勾选了某个checkbox,马上执行操作,自动更新对应的preference。
以上部分内容摘自:
《给 Android 开发者的 RxJava 详解》——扔物线
Carson_Ho的简书文章