RxJava初识与入门理解

响应式编程

响应式编程是一种面向数据流变化传播编程范式

- 数据流:只能以事先规定好的顺序被读取一次的数据的一个序列
- 变化传播:类似观察者模式,变化了通知别人
- 编程范式:计算机编程的基本风格或模式(如:面向对象/面向过程编程)

RxJava

1.异步数据处理库;

2.扩展的观察者模式(比传统的观察者模式多了oncompleted和onerror事件回调,使观察者模式更加完善)

RxJava 是以观察者模式为核心,可以通过强大的操作符,对事件中的消息进行加工包装,并且可以轻松实现线程调度的一个框架。

RxJava特点:

1. jar包<1MB

2.轻量级框架

3.支持java8, lambda表达式

4.支持java6+,android2.3+

5.支持同步和异步数据处理

背压概念

  • 异步环境下产生的问题
  • 发送和处理数据不统一
  • 是一种流速控制解决策略

RxJava2基本要素

1. Observable:
- 被观察者,不支持背压;
- 通过Observable创建一个可观察的序列(Creat);
- 通过subscribe去注册一个观察者
2. Observer:
- 用于接收数据,观察者
- 作为Observable的subscribe方法的参数
3. Disposable:
- 和Rxjava1的Subscription的作用差不多
- 用于取消订阅和获取当前的订阅状态
4. ObservableOnSubscribe:
- 当订阅时会触发此接口调用
- 在Observable内部,实际作用是向观察者发射数据
5. Emitter
- 一个发射数据的接口,和Observer的方法类似
- 本质是对Observer和Subscriber的包装

观察者四要素

Observable :被观察者,用来生产发送事件;
Observer:观察者,接收被观察者传来的事件;
Event:包装事件发送中的消息,在事件的传递过程中,可以通过操作符对事件进行各种加工(转换,过滤,组合……);
Subscribe:被观察者和观察者通过订阅产生关系后,才具备事件发送和接收能力;

1.Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

2.与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

3.onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

4.onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

5.在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxAndroid

1.是RxJava针对Android平台的一个扩展,用于安卓开发

2.提供响应扩展组件,快速易于开发安卓应用程序
  • RxJava1中 Action0-Action9 ,在RxJava2中其中Action0 改成了Action,Action1改成的Consumer,Action2改名成了BiConsumer,Action3-Action9都不在使用了。
  • 同样RxJava1中Func1在RxJava2中改名成Function ,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 改名成 Function
  • FuncX 和 ActionX 的相同点是包装含有一个参数的方法,区别在 FuncX 包装的是有返回值的方法。

操作符

(1)Creat Observable(创建)

        Creat:普通创建

        Just:简化版的creat,将传入的参数依次发送出来。

        From:可创建数组or列表

        Defer:延迟创建Observable,当调用Subscribe时才会去创建Observable

        Empty/Never/Throw: 创建空的/不进行任何事件传递/只执行onError方法

        Interval: 定时传递数据操作

        Range:创建一定范围的对象数据

        Repeat:多次重复发送数据

(2)Transforming Observable(转换)

转换常用操作符

        Map: 一个对象转换成另一个对象

        FlatMap:一个对象转换成多个Observable对象

flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理是这样的:

  1. 使用传入的事件对象创建一个 Observable 对象;
  2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
  3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

(3)Filtering Observable(过滤)

        Debounce :指定时间间隔未发生事件,才会发送该数据

        Distinct:去掉重复数据

        ElementAt:取出置顶位置数据

        Filter:按照置顶规则过滤,得到想要的数据

        First:取第一位

        IngnoreElements:不回调OnError,回调OnCompleled,出错时回调OnError

        Last:取最后一位

        Sample:定时取样

        Skip:跳过指定数据

        SkipLast:跳过数据最后几个,取出前面数据

        Take:取前面几个数据

        TakeLast:取最后几个数据

(4)Combining Observable(组合)

        ZIP:两个数据组合

        Merge:按照时间戳顺序,组合两个数据成一个

        StartWith:在当前插入数据/Observable

(5)Error Handling Operators(错误处理操作符)

        Catch:onErrorReturn(数据错误就结束),onErrorResumeNext(数据错误,自动补全数据,继续),                                        onExceptionResumeNext(数据错误,自动补全数据,继续,结束后会抛出异常)

        Retry:重试 retry,RetryWhen

RxJavaSchedulers(调度器,线程控制)

RxJava是以一种简单的方式解决多线程问题的机制,在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)
  • Scheduler.io 用于I/O读写操作(读写文件,读写数据库,网络信息交互等耗时操作),行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

  • Scheduler.computation:计算所使用的,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • Scheduler.immediate():立即执行 :当前线程,相当于不指定线程,默认

  • Scheduler.newThread() :开启新线程,在新线程执行操作

  • Scheduler.trampoline() :按照顺序队列,并运行每一个任务 repeat(),retry()

  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
SubscribeOn RxJava提供了subscribeOn()方法来用于指定Observable(被观察者)对象所发生的线程,只执行第一次操作

ObserveOn RxJava提供了ObserveOn()方法用于每个Subscriber(observer观察者)对象所发生的线程,在那个线程,执行后面的操作

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 10,912评论 7 62
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 4,455评论 0 2
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 12,990评论 6 151
  • (2016-07-29-星期五 15:03:48)
    菜五阅读 2,087评论 0 0
  • 饥饿, 吃食呢 不是阳光 不是金属 不是人类的皇冠 是身上的花 可她在背上 不属于我 数字 还有数字! 请捡起我 ...
    心种阅读 1,170评论 0 1