响应式编程
响应式编程是一种面向数据流
和变化传播
的编程范式
- 数据流:只能以事先规定好的顺序被读取一次的数据的一个序列
- 变化传播:类似观察者模式,变化了通知别人
- 编程范式:计算机编程的基本风格或模式(如:面向对象/面向过程编程)
RxJava
1.异步数据处理库;
2.扩展的观察者模式(比传统的观察者模式多了oncompleted和onerror事件回调,使观察者模式更加完善)
RxJava 是以观察者模式为核心,可以通过强大的操作符,对事件中的消息进行加工包装,并且可以轻松实现线程调度的一个框架。
RxJava特点:
1. jar包<1MB
2.轻量级框架
3.支持java8, lambda表达式
4.支持java6+,android2.3+
5.支持同步和异步数据处理
背压概念
- 异步环境下产生的问题
- 发送和处理数据不统一
- 是一种流速控制解决策略
RxJava2基本要素
1. Observable:
- 被观察者,不支持背压;
- 通过Observable创建一个可观察的序列(Creat);
- 通过subscribe去注册一个观察者
2. Observer:
- 用于接收数据,观察者
- 作为Observable的subscribe方法的参数
3. Disposable:
- 和Rxjava1的Subscription的作用差不多
- 用于取消订阅和获取当前的订阅状态
4. ObservableOnSubscribe:
- 当订阅时会触发此接口调用
- 在Observable内部,实际作用是向观察者发射数据
5. Emitter
- 一个发射数据的接口,和Observer的方法类似
- 本质是对Observer和Subscriber的包装
观察者四要素
Observable :被观察者,用来生产发送事件;
Observer:观察者,接收被观察者传来的事件;
Event:包装事件发送中的消息,在事件的传递过程中,可以通过操作符对事件进行各种加工(转换,过滤,组合……);
Subscribe:被观察者和观察者通过订阅产生关系后,才具备事件发送和接收能力;
1.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2.与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3.onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4.onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5.在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxAndroid
1.是RxJava针对Android平台的一个扩展,用于安卓开发
2.提供响应扩展组件,快速易于开发安卓应用程序
- RxJava1中 Action0-Action9 ,在RxJava2中其中Action0 改成了Action,Action1改成的Consumer,Action2改名成了BiConsumer,Action3-Action9都不在使用了。
- 同样RxJava1中Func1在RxJava2中改名成Function ,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 改名成 Function
- FuncX 和 ActionX 的相同点是包装含有一个参数的方法,区别在 FuncX 包装的是有返回值的方法。
操作符
(1)Creat Observable(创建)
Creat:普通创建
Just:简化版的creat,将传入的参数依次发送出来。
From:可创建数组or列表
Defer:延迟创建Observable,当调用Subscribe时才会去创建Observable
Empty/Never/Throw: 创建空的/不进行任何事件传递/只执行onError方法
Interval: 定时传递数据操作
Range:创建一定范围的对象数据
Repeat:多次重复发送数据
(2)Transforming Observable(转换)
转换常用操作符
Map: 一个对象转换成另一个对象
FlatMap:一个对象转换成多个Observable对象
flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理是这样的:
- 使用传入的事件对象创建一个 Observable 对象;
- 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
- 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
(3)Filtering Observable(过滤)
Debounce :指定时间间隔未发生事件,才会发送该数据
Distinct:去掉重复数据
ElementAt:取出置顶位置数据
Filter:按照置顶规则过滤,得到想要的数据
First:取第一位
IngnoreElements:不回调OnError,回调OnCompleled,出错时回调OnError
Last:取最后一位
Sample:定时取样
Skip:跳过指定数据
SkipLast:跳过数据最后几个,取出前面数据
Take:取前面几个数据
TakeLast:取最后几个数据
(4)Combining Observable(组合)
ZIP:两个数据组合
Merge:按照时间戳顺序,组合两个数据成一个
StartWith:在当前插入数据/Observable
(5)Error Handling Operators(错误处理操作符)
Catch:onErrorReturn(数据错误就结束),onErrorResumeNext(数据错误,自动补全数据,继续), onExceptionResumeNext(数据错误,自动补全数据,继续,结束后会抛出异常)
Retry:重试 retry,RetryWhen
RxJavaSchedulers(调度器,线程控制)
RxJava是以一种简单的方式解决多线程问题的机制,在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)
Scheduler.io 用于I/O读写操作(读写文件,读写数据库,网络信息交互等耗时操作),行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Scheduler.computation:计算所使用的,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Scheduler.immediate():立即执行 :当前线程,相当于不指定线程,默认
Scheduler.newThread() :开启新线程,在新线程执行操作
Scheduler.trampoline() :按照顺序队列,并运行每一个任务 repeat(),retry()
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
SubscribeOn RxJava提供了subscribeOn()方法来用于指定Observable(被观察者)对象所发生的线程,只执行第一次操作
ObserveOn RxJava提供了ObserveOn()方法用于每个Subscriber(observer观察者)对象所发生的线程,在那个线程,执行后面的操作