intro
"森林里的一棵树倒下来,如果周围没有人听见,那么就等于说树的倒下是寂静无声的."
随着产品功能的增加,公司的业务代码逻辑趋于复杂,阅读难度也随着提升,故想引入 Rxjava .先前对之只有简单的了解,只知道其是响应式编程.但响应式编程又是什么呢?在计算机中,响应式编程是一种面向数据流和变化传播的编程范式.我的理解其实就是类似事件监听,"你去完成某项操作,完成了告诉我就行."也就是一种简单的观察者模式.在这其中,如果事件间有什么逻辑关系你可以在你的发送事件那里进行处理,形成事件流.许许多多的类似操作就成了事件池.这样,我们可以把所有的异步操作都扔到这个池中,当然我们也只需在监听拿到这个事件池中发出的事件进行处理即可.
以下是官网介绍:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
RxJava就是一个实现异步操作的库.请记住,异步异步异步,你引入这个库到你项目中并在学习Rxjava过程中也请时刻牢记这个词.
为什么响应式和为什么Rxjava
完全同步操作编写的应用在实际项目中是不存在的,异步源操作存在使得我们编程的复杂度大大提升.各种异步源的操作是不稳定的,我们不能假设其操作永远成功然后输出我们需要的结果.反之需要记录各种异步源操作的结果与状态,将之反应到相应属性中或者是后续动作.例如,我们可以在每次异步调用后提供一个回调处理 Runnable 去进行后续的响应操作.也就是,"我操作成功了你拿我的结果去搞事情吧".这其实不是说得这么简单,我们需要对可能发生的问题进行处理,也就是"我操作搞砸了你之后应怎么办".
这都是对于异步操作的描述,而我们维护这些操作的状态.随着多个异步调用的出现,我们需要维护更多的状态与属性,或是一个异步操作是在另一个异步操作之后进行调用的或是更为复杂逻辑.当然,在Android中我们还需考虑其回调操作所在的线程,避免崩溃.许许多多的状态维护与其他的线程操作间的协调处理会带来极高的复杂度.试想,一个界面需要在几个网络请求完成后才显示界面,这样你需分别判断维护各个请求成功与否的状态等等问题.响应式其实就是为了解决这个问题,为了让我们能直接与异步源操作直接关联,通过订阅关系,在你数据发生变更的时候我们才做出响应.当然,这种响应模型应该要能处理多个异步源的请求,我们所编写的代码应该位于模型与异步源的中间,作为状态仲裁器 (state arbiter) 来使用,而不用试图去协调所有异步源的操作.这样,我们终于可以把异步源操作连接在一起了,而不用去试图管理所有的状态和事件而去编写混乱的代码了.
这也就是我们需要 Rxjava 的原因所在.
RxJava2
RxJava由用于表示数据源的一组类,用于侦听数据源的一组类,用于修改与合并数据的一组方法组成.
数据源可以分为有尽源和无尽源两种.在无尽源的情况下,系统接收处理事件往往会爆掉内存如果没有对之进行处理的话. RxJava2 表示提供事件源的过程提供了两种类型: Flowable 和 Observable. 区别就是 Flowable 支持背压,即我们上述所说的进行处理,背压控制方式,增大其事件源的事件池的大小,控制事件一次所能接收的数目等方法.以下是 RxJava给我们提供的接口方法:
BackpressureStrategy.BUFFER BackpressureStrategy.DROP BackpressureStrategy.LATEST / onBackpressureBuffer() onBackpressureDrop() onBackpressureLatest()
关于事件源的发出与接收是以发送方为主导的,我可以发送无数个,你也可以接收无数个.事件源发送完成了 onComplete 或是发生错误 onError, 下游接收方则会停止接收事件, 但之后如果事件源还有事件发送事件源则会继续进行他的工作而不管他的事件会不会被接收到.
事件源发出就是直接发出去的吗?不是的.事件源通过onNext,just等接口方法发出去时,会在内存中申请一个事件池的东西,我们可以这样叫他, 我们接收的事件是再从里面拿出来的. 在 onSubscribe 中的 Disposable 对象上,我们可以通过 request 方法请求我们需要或是能处理的事件数目. 这也是处理背压的一种方式了!
修改合并数据的一组方法,如 map, flatmap, zip 等, 切换线程的方法 observerOn, subscribeOn.注意的是不要把计算工作放在 Schedulers.io()中,避免创建不必要的线程,也不要把 I/O 操作置于 Schedulers.computation() 中浪费 CPU 资源. 多个 subscribeOn 对线程的指定其实只有第一个会有效用, 而 observerOn 则是对后续接收事件流所在线程的指定. 修改合并数据的一组方法和切换线程的操作原理都是通过 lift 方法对事件进行"代理监听". 我是这样理解的. 当你需要切换线程时或是需要对接收发送的数据进行处理, lift 先在另一线程或是进行数据处理之后通知本身说我这边的变换已经完成了,你可以把你的事件发送到目标 Subscriber 了. 多次变换情况一样.
必须记住要去管理返回的 Diaposable , 避免代码在 Activity 消失之后还继续运行造成的内存泄露!
最后,正襟危坐跑一下Learn RxJava By Examples是个不错的选择.
常用的方法:
创建操作符:create()、just()(并发,不超过10个)、fromArray(T... items)、fromCallable()、fromFuture()(增加 cancel()等方法操作的Callable)、fromIterable(Iterable<? extends T> source)、defer()(被订阅后才会创建)、timer()、iterval()(自增1的 timer)、intervalRange()、range()、rangeLong()、empty()&never()&error()
转换操作符:map()(可转换发送的数据类型)、flatMap()(整合加工)、concatMap()(有序的flatMap)、buffer()、groupBy()(进行分组,每个分组都会返回一个被观察者)、scan()(将数据按照一定逻辑聚合起来)、window()(规定几个数据分为一组)
组合操作符:concat()(多个组合,按照之前顺序、最多4个)、concatArray()(可多于4个)、merge()(并行发送事件)、concatArrayDelayError()&mergeArrayDelayError()(在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话)、zip()(会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数会和源Observable中最少事件的数量一样)、combineLatest()&combineLatestDelayError()(类似zip(),发送事件的序列是与发送的时间线有关)、reduce()(与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者)、collect()(收集到数据结构中)、startWith()&startWithArray()(发送前追加事件)、count()(返回被观察者发送事件的数量)
功能操作符:delay()、doOnEach()(每发送一事件前都会先回调这个方法)、doOnNext()(onNext()之前)、doAfterNext()、doOnComplete()、doOnError()、doOnSubscribe()、doOnDispose()(Disposable 的 dispose() 之后)、doOnLifecycle()(在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅)、doOnTerminate()&doAfterTerminate()(doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调)、doFinally()、onErrorReturn()(当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列)、onErrorResumeNext()(返回一个新的 Observable)、onExceptionResumeNext()(只能捕获Exception)、retry(long times)(重新发送所有事件序列)、retryUntil(final BooleanSupplier stop)(判断是否继续发送事件)、retryWhen()、repeat()、repeatWhen()、subscribeOn()(指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效)、observeOn()(指定观察者的线程,每指定一次就会生效一次)
过滤操作符:filter()、ofType(final Class<U> clazz)(可以过滤不符合该类型事件)、skip(long count)(跳过事件数量)、distinct()(去掉重复事件)、distinctUntilChanged()(去掉连续重复事件)、take(long count)(观察者接收事件数量)、debounce()(如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者)、firstElement()&lastElement()(事件序列的)、elementAt()&elementAtOrError()
条件操作符:all()(判断事件序列是否全部满足某个事件)、takeWhile()(满足条件时就会发送该数据)、skipWhile()、takeUntil()(当事件满足此条件时,下一次的事件就不会被发送了)、skipUntil()、sequenceEqual()(两个Observable发送的事件是否相同)、contains()、isEmpty()、amb()、defaultEmpty()(只发送一个 onComplete() 事件,则可以利用这个方法发送一个值)、
references