RxJava2入门教程一

版权所有,转载请注明出处:linzhiyong https://www.jianshu.com/p/559c5e7376a2 https://blog.csdn.net/u012527802/article/details/81117684

本文主要介绍RxJava2是什么、RxJava2的基本使用。
Github:https://github.com/ReactiveX/RxJava
RxDoc:http://reactivex.io/RxJava/2.x/javadoc/
RxJava2Demo:https://github.com/linzhiyong/RxJava2Demo

目录

  1. RxJava2介绍
    1.1 什么是RxJava
    1.2 RxJava2与RxJava区别
  2. RxJava2操作符介绍
    2.1 创建操作符
    2.2 线程相关操作符
    2.3 事件监听操作符
    2.4 过滤操作符
    2.5 其他操作符
  3. 总结

1. RxJava2介绍

1.1 什么是RxJava

Github的介绍

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Google翻译一下

RxJava是一个在Java虚拟机上的响应式扩展:一个用于通过使用可观察序列来编写异步和基于事件的程序的库。
它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。

总结一下,RxJava是基于响应式编程思想,实现并扩展了观察者模式,可以进行异步操作的库。

注:这里简单介绍一下,响应式编程、观察者模式的概念
1、响应式编程,简称RP(Reactive Programming),是一个专注于数据流和变化传递的异步编程范式。一个简单的例子:在Excel中,在A1单元格输入数字,在B1输入数字,设置C1的内容为SUM(A1, B1),一旦A1或B1数值变化,那么C1将会立即更新;
2、观察者模式,属于行为型模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图 (Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式,作用是定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。例如:拍卖的时候,拍卖师观察最高标价,然后通知给其他竞价者竞价。

1.2 RxJava2与RxJava区别

1、在Rxjava 2中,有一个新的Reactive类型:Flowable,它与Observable很相似,但是有一个关键的不同是,Flowable支持背压 backpressure。支持backpressure,如果事件的消费者不能及时消费生产的事件时可以定义一个处理事件的策略,开发者要自己实现这个事件处理策略。[关于背压];
2、Rxjava2中onNext设计成限制不能传null;

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

3、增加Single角色,和Observable,Flowable一样会发送数据,不同的是订阅后只能接受到一次;
4、Actions和Functions方法名的修改和次要类的删除;
5、。。。

1.3 RxJava2的角色和事件介绍

角色 功能
被观察者(Observable/Flowable) 产生事件(Flowable支持背压)
观察者(Observer/Subscriber) 响应事件并做出处理
事件(event) 被观察者和观察者的消息载体
订阅(subscribe) 关联被观察者和观察者
订阅控制(Disposable/Subscription) 用于取消被观察者与观察者的关系,Subscription支持背压拉取消息

RxJava2事件消费者包括Observer、Subscriber、Consumer。其中使用Consumer的话,RxJava内部默认实现Observer/Subscriber,然后在onNext()方法中回调Consumer的accept()。

下面主要介绍一下Emitter、Observer/Subscriber,Emitter接口作为被观察者的事件发射器,有3中事件:onNext、onError、onComplete;Observer/Subscriber作为观察者的事件接收器,两个接口除了名字完全相同,只是为了区分Observable/Flowable的使用,包含4种事件:onSubscribe、onNext、onError、onComplete;

事件 说明
onSubscribe 观察者订阅被观察者时,触发该事件,同时返回一个订阅控制对象(Disposable/Subscription)
onNext 被观察者通过onNext可以发送多个事件,观察者可以通过onNext接收多个事件
onError 被观察者发送onError事件后,其他事件被终止发送,观察者收到onError事件后会终止接受其他事件
onComplete 被观察者发送onComplete事件后,其他事件被终止发送,观察者收到onComplete事件后会终止接受其他事件

注:onError和onComplete是互斥的事件,一个正确运行的事件序列中, onCompleted和onError只能有一个,并且是事件序列中的最后一个。

看一个订阅的示例

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            // 使用Emitter事件发射器发射事件
            emitter.onNext("这是事件1");
            emitter.onNext("这是事件2");
//                emitter.onError(new Exception("这里事件发生了异常。"));
            emitter.onNext("这是事件3");
            emitter.onNext("这是事件4");
            emitter.onComplete();
        }
    });

    // 定义观察者,接收事件
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            // 订阅成功回调该方法,返回控制对象
            // d.dispose();
            Log.i(TAG, "--onSubscribe--");
        }
        @Override
        public void onNext(String s) {
            // 这里接收被观察者发出的事件
            Log.i(TAG, "--onNext--" + s);
        }
        @Override
        public void onError(Throwable e) {
            // 错误事件
            Log.i(TAG, "--onError--" + e.getMessage());
        }
        @Override
        public void onComplete() {
            // 完成事件
            Log.i(TAG, "--onComplete--");
        }
    };

    // 观察者订阅被观察者
    observable.subscribe(observer);
}

看一下控制台日志:

I/com.lzy.org.rxjava2.RxJava2Test2: --onSubscribe--
I/com.lzy.org.rxjava2.RxJava2Test2: --onNext--这是事件1
I/com.lzy.org.rxjava2.RxJava2Test2: --onNext--这是事件2
I/com.lzy.org.rxjava2.RxJava2Test2: --onNext--这是事件3
I/com.lzy.org.rxjava2.RxJava2Test2: --onNext--这是事件4
I/com.lzy.org.rxjava2.RxJava2Test2: --onComplete--

2 RxJava2常用操作符介绍

2.1 创建操作符

名称 功能介绍
create public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
创建一个被观察者,同时定义并发送事件,手动维护事件的发送和结束
just public static <T> Observable<T> just(T item1, ... T item10)
创建一个被观察者,并发送事件,发送的事件不可以超过10个
fromArray public static <T> Observable<T> fromArray(T... items)
创建一个被观察者,并发送事件,参数接收一个事件数组
fromIterable public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
创建一个被观察者,并发送事件,参数接收一个事件集合,如List
fromCallable public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
参数Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的
fromFuture public static <T> Observable<T> fromFuture(Future<? extends T> future)
参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,可以通过 get() 方法来获取 Callable 返回的值
defer public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会通知一次观察者
timer public static Observable<Long> timer(long delay, TimeUnit unit)
延时发送,当到指定时间后就会发送一个 0L 的值给观察者
interval public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
定时发送,每隔一段时间就会发送一个事件,这个事件从0开始递增
intervalRange public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
可以指定发送事件的开始值和发送数量,其他与 interval() 的功能一样
range/rangeLong public static Observable<Integer> range(final int start, final int count)
public static Observable<Long> rangeLong(long start, long count)
发送一定范围的事件序列

2.2 线程相关操作符

名称 功能介绍
subscribeOn public final Single<T> subscribeOn(final Scheduler scheduler)
指定被观察者事件发送的线程,如果多次调用此方法,只有第一次有效
observeOn public final Observable<T> observeOn(Scheduler scheduler)
指定观察者处理事件的线程,每指定一次就会生效一次

注:Scheduler类型

类型 使用方式 含义 使用场景
IoScheduler Schedulers.io() io操作线程 读写SD卡文件、查询数据库、访问网络等IO密集型等操作
NewThreadScheduler Schedulers.newThread() 创建新线程 耗时操作等
SingleScheduler Schedulers.single() 单例线程 只需一个单例线程时
ComputationScheduler Schedulers.computation() CPU计算操作线程 图片压缩取样、xml、json解析等CPU密集型计算
TrampolineScheduler Schedulers.trampoline() 当前线程 需要在当前线程立即执行任务时
HandlerScheduler AndroidSchedulers.mainThread() Android主线程 更新U等I
ExecutorScheduler Schedulers.from(Executor executor) 自定义线程 自定义任务等

2.3 事件监听操作符

名称 功能介绍
doOnNext public final Observable<T> doOnNext(Consumer<? super T> onNext)
每次发送onNext之前回调
doOnEach public final Observable<T> doOnEach(final Observer<? super T> observer)
每次发送事件之前回调
doAfterNext public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
每次发送onNext事件之后回调
doOnError public final Observable<T> doOnError(Consumer<? super Throwable> onError)
发送 onError() 之前回调
doOnComplete public final Observable<T> doOnComplete(Action onComplete)
发送 onComplete() 之前回调
doOnSubscribe public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
发送 onSubscribe() 之前回调
doOnDispose public final Observable<T> doOnDispose(Action onDispose)
调用 Disposable 的 dispose() 之后回调
doOnTerminate
doAfterTerminate

doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调(取消订阅,方法失效)
doFinally public final Observable<T> doFinally(Action onFinally)
无论是否取消订阅,在所有事件发送完毕之后回调该

2.4 过滤操作符

名称 功能介绍
filter public final Flowable<T> filter(Predicate<? super T> predicate)
通过一定的逻辑来过滤被观察者发送的事件,返回 true 则发送事件,否则不发送
ofType public final <U> Observable<U> ofType(final Class<U> clazz)
事件类型过滤
distinct public final Observable<T> distinct()
去重操作符,去掉重复的事件
distinctUntilChanged public final Observable<T> distinctUntilChanged()
过滤掉连续重复的事件
skip public final Observable<T> skip(long count)
跳过世界集合中的某些事件,count 代表跳过事件的数量
debounce public final Observable<T> debounce(long timeout, TimeUnit unit)
如果两件事件发送的时间间隔小于设定的时间间隔timeout,则前一件事件就不会发送
take public final Observable<T> take(long count)
取指定数量的事件
firstElement / lastElement public final Maybe<T> firstElement()
firstElement() 获取事件序列的第1个元素,lastElement() 获取事件序列的最后1个元素
elementAt / elementAtOrError public final Maybe<T> elementAt(long index)
elementAt()可以从事件序列中取值指定index的事件,如果不存在,则无响应。
如果想要在获取不到事件的时候发出响应使用elementAtOrError()

2.5 其他操作符

名称 功能介绍
map public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
遍历被观察者发送的事件,可以对事件进行二次处理
flatMap public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
可以将事件序列中的元素进行整合加工,返回一个新的被观察者,flatMap 并不能保证事件的顺序
concatMap public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
功能与flatMap,但是concatMap可以保证事件的顺序
buffer public final Observable<List<T>> buffer(int count, int skip)
从需要发送的事件中获取一定数量的事件,并将这些事件放到缓冲区中一次性发出
groupBy public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
将发送的数据进行分组,每个分组都会返回一个被观察者
scan public final Observable<T> scan(BiFunction<T, T, T> accumulator)
将时间以一定的逻辑聚合起来
reduce public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
操作符的作用也是将发送数据以一定逻辑聚合起来, 区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者
window public final Observable<Observable<T>> window(long count)
将事件按照count指定的数量分组,一次性发送一组事件
concat/concatArray public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ..., ObservableSource<? extends T> N4)
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。功能与concat相同,只是concatArray参数接收数组。
需要注意的是,concat() 最多只可以发送4个事件。如果其中有一个被观察者发送了一个 onError 事件,那么就会停止发送事件
merge/mergeArray public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
...
功能与concat相同,只是merge可以并发,不是按照被观察者的顺序发送事件
concatArrayDelayError
mergeArrayDelayError
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
如果事件发送过成中出现了onError事件,该方法可以延迟到所有被观察者都发送完事件后再执行onError
zip public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样
collect public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
将要发送的事件收集到定义的数据结构中
startWith/startWithArray public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出
count public final Single<Long> count()
返回被观察者发送事件的数量
delay public final Observable<T> delay(long delay, TimeUnit unit)
延时发送事件

还有其他操作符,像onErrorResumeNext、retry、repeat、all、skipWhile等等等,这里就不一一列举了,如果需要,可以去自行查询RxjavaDoc

3 总结

本章主要是介绍了RxJava2,比较了与RxJava的差异,还列举了一堆让人看着头疼的操作符,都是些概念性的东西,最关键的是学会如何把RxJava应用到自己的项目中。流行的框架固然好,但是盲目的追风只会导致项目更加难以开发和维护,更不要谈什么效率了。适合你的才是最好的。

下一章节,会具体介绍RxJava2的基本使用方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容