rxjava相关基础知识

RxJava2.0简介(一)
1.定义
RxJava是响应式编程(Reactive Extensions)在JVM平台上的实现,即用Java语言实现的一套基于观察者模式的异步编程接口。

响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。

今天,响应式编程最通用的一个场景是UI:我们的移动App必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。

2.引用
在app的gradle文件中添加这两行代码即可。

implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

3.使用
3.1 Observable和Observer


image

上面是一根水管,称为上游,下面是一根水管,称为下游。上游把事件送到下游,这就是RxJava的基本原理,基本原理了解后剩下的知识不过是对原理的修修补补。

这两根水管连接起来,上游发送事件1,2,3,而下游则依次接受到事件1,2,3。

在RxJava中,上游被称为Observable(观察得到的),下游被称为Observer(观察者),连接Observable与Observer的方法被称为subscribe(),用RxJava写的话就是这样:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer value) {
                Log.i(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        };
        observable.subscribe(observer);
2020-03-27 01:25:10.920 7786-7786/com.example.uitestdemo I/TestRxActivity: onSubscribe: 
2020-03-27 01:25:10.920 7786-7786/com.example.uitestdemo I/TestRxActivity: emitter1: 
2020-03-27 01:25:10.920 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: 1
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: emitter2: 
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: 2
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: emitter3: 
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: 3
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: onComplete: 
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: onComplete: 
2020-03-27 01:25:10.921 7786-7786/com.example.uitestdemo I/TestRxActivity: emitter4: 

3.2 ObservableEmitter和Disposable

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i(TAG, "subscribe: 1");
                e.onNext(1);
                Log.i(TAG, "subscribe: 2");
                e.onNext(2);
                Log.i(TAG, "subscribe: 3");
                e.onNext(3);
                Log.i(TAG, "subscribe: complete");
                e.onComplete();
                Log.i(TAG, "subscribe: 4");
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable disposable;
            private int count;

            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
                disposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.i(TAG, "onNext: " + value);
                count++;
                if (count == 2) {
                    disposable.dispose();
                    Log.i(TAG, "onNext: dispose");
                    Log.i(TAG, "onNext: " + disposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        });
2020-03-27 01:24:56.928 7786-7786/com.example.uitestdemo I/TestRxActivity: onSubscribe: 
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: subscribe: 1
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: 1
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: subscribe: 2
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: 2
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: dispose
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: onNext: true
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: subscribe: 3
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: subscribe: complete
2020-03-27 01:24:56.929 7786-7786/com.example.uitestdemo I/TestRxActivity: subscribe: 4

在上面的代码中,除过Observable与Observer外,还混入了两个奇怪的东西,ObservableEmitter和Disposable。

Emitter是发射器的意思,而ObservableEmiiter的含义便可想而知了,用来发射事件的。 ObservableEmitter可以发送三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。在我们发射时还必须满足以下几个规则:

1.上游可以发送无限个onNext, 下游也可以接收无限个onNext。

2.当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。

3.当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。

4.上游可以不发送onComplete或onError事件。

5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

上面的几条都好理解,关键是最后一条,这是需要我们在代码中控制的,虽然违反最后一条并不一定导致程序崩溃。比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。

3.3 RxJava中的线程操作
在默认情况下,RxJava中的Observable与Observer都运行在主线程中,试一下。

     Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                Log.i(TAG, "subscribe: " + Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.i(TAG, "accept: " + integer);
                        Log.i(TAG, "accept: " + Thread.currentThread().getName());
                    }
                });
2020-03-27 01:24:41.831 7786-8259/com.example.uitestdemo I/TestRxActivity: subscribe: RxCachedThreadScheduler-4
2020-03-27 01:24:41.832 7786-7786/com.example.uitestdemo I/TestRxActivity: accept: 1
2020-03-27 01:24:41.832 7786-7786/com.example.uitestdemo I/TestRxActivity: accept: main

先看两个方法。

subscribeOn() 指定的是上游发送事件的线程,,observeOn() 指定的是下游接收事件的线程。

多次指定上游的线程只有第一次指定的有效,,也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略。

多次指定下游的线程是可以的,,也就是说每调用一次observeOn() , 下游的线程就会切换一次。

当然,在RxJava中也内置了多种线程选线供我们使用。这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项,,在RxJava内部使用的是线程池来维护这些线程, 所以效率也比较高。

名称 含义
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

RxJava2.0操作符
Map与FlatMap
那什么是操作符?

RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易懂。操作符实质上就是RxJava函数式编程模式的体现。在我看来,函数就是变换关系的简称,比如在有一个数字集合A,又有一个数字集合B,从数字集合A变换到数字集合B的的这种关系,可以将其称为函数。

RxJava创建型操作符

测试

create()操作符
just()操作符
fromArray()操作符
fromIterable()操作符
differ()操作符
timer()操作符
interval()操作符
intervalRange()操作符
range()操作符
rangeLong()操作符
其他

RxJava变换操作符
map()操作符
flatMap()操作符
concatMap()操作符
buffer()操作符

RxJava组合/合并操作符
concat()操作符
concatArray()操作符
merge()操作符
mergeArray()操作符
concatDelayError()操作符
mergeDelayError()操作符
zip()操作符
combineLatest()操作符
combineLatestDelayError()操作符
reduce()操作符
collect()操作符
startWith()操作符
count()操作符

RxJava功能型操作符
subscribe()操作符
subscribeOn() & observeOn()操作符
delay()操作符
do操作符
onErrorReturn()操作符
onErrorResumeNext()
onExceptionResumeNext()操作符
retry()操作符
retryUntil()操作符
retryWhen()操作符
repeat()操作符
repeatWhen()操作符
debounce()操作符

RxJava过滤操作符
filter()操作符
ofType()操作符
skip() / skipLast()操作符
distinct() / distinctUntilChanged()操作符
take() & takeLast()操作符
throttleFirst() / throttleLast()操作符
sample()操作符
throttleWithTimeout()操作符
firstElement() / lastElement()操作符
elementAt()操作符
elementAtOrError()操作符
ignoreElements() 操作符

RxJava条件/布尔操作符
all()操作符
takeWhile()操作符
skipWhile()操作符
takeUntil()操作符
skipUntil()操作符
sequenceEqual()操作符
contains()操作符
isEmpty()操作符
amb()操作符
defaultIfEmpty()操作符

<a id="测试">测试</a>

<span id="jump">跳转到的地方</span>

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "这是:" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, s);
            }
        });
2020-03-27 01:32:03.263 8792-8792/com.example.uitestdemo I/TestRxActivity: 这是:1
2020-03-27 01:32:03.263 8792-8792/com.example.uitestdemo I/TestRxActivity: 这是:2
2020-03-27 01:32:03.263 8792-8792/com.example.uitestdemo I/TestRxActivity: 这是:3

可以看到,我们在Observable中输入的是int类型,但是到了Consumer上,却变为了String类型,起作用的就是中间的map函数,它将String类型变为了int类型。

接下来是FlatMap操作符,这是一个很强大的操作符。

FlatMap将一个发送事件的上游的Observable,变为多个发送事件的Observable,然后将它们发射的事件合并后放进一个单独的Observable中。

上游的Observable每发送一个事件,faltMap都会创建一个新的水管,发送转换之后的事件,但是flatMap并不保证转换后事件的顺序性,并不能一定保证事件1在事件2的前面,如果需要顺序保证可以使用concatMap。

     Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 15; i++) {
                    list.add("Value is:" + integer + "  " + i);
                }
                return Observable.fromIterable(list).delay(1000, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "accept: " + s);
            }
        });


2020-03-27 01:38:53.241 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  0
2020-03-27 01:38:53.242 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  1
2020-03-27 01:38:53.242 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  2
2020-03-27 01:38:53.243 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  3
2020-03-27 01:38:53.245 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  4
2020-03-27 01:38:53.246 9715-9810/com.example.uitestdemo I/TestRxActivity: accept: Value is:2  0
2020-03-27 01:38:53.246 9715-9810/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  5
2020-03-27 01:38:53.247 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  6
2020-03-27 01:38:53.248 9715-9810/com.example.uitestdemo I/TestRxActivity: accept: Value is:2  1
2020-03-27 01:38:53.248 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  7
2020-03-27 01:38:53.249 9715-9810/com.example.uitestdemo I/TestRxActivity: accept: Value is:2  2
2020-03-27 01:38:53.249 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  8
2020-03-27 01:38:53.250 9715-9810/com.example.uitestdemo I/TestRxActivity: accept: Value is:2  3
2020-03-27 01:38:53.250 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:1  9
2020-03-27 01:38:53.250 9715-9809/com.example.uitestdemo I/TestRxActivity: accept: Value is:3  0

concatMap,它和flatMap几乎是一模一的,只不过concatMap是严格按照上游发送的顺序来发送的。

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                List<String> stringList = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    stringList.add("Value is" + integer + "  " + i);
                }
                return Observable.fromIterable(stringList).delay(100, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i(TAG, "accept: " + s);
            }
        });


2020-03-27 01:44:36.816 10168-10239/com.example.uitestdemo I/TestRxActivity: accept: Value is1  0
2020-03-27 01:44:36.817 10168-10239/com.example.uitestdemo I/TestRxActivity: accept: Value is1  1
2020-03-27 01:44:36.818 10168-10239/com.example.uitestdemo I/TestRxActivity: accept: Value is1  2
2020-03-27 01:44:36.819 10168-10239/com.example.uitestdemo I/TestRxActivity: accept: Value is1  3
2020-03-27 01:44:36.819 10168-10239/com.example.uitestdemo I/TestRxActivity: accept: Value is1  4
2020-03-27 01:44:36.849 6767-7737/? I/SmartThreadExecutor: handleMessage: start check 0
2020-03-27 01:44:36.850 6767-7737/? I/SmartThreadExecutor: handleMessage: check end 0 -275 false
2020-03-27 01:44:36.924 10168-10240/com.example.uitestdemo I/TestRxActivity: accept: Value is2  0
2020-03-27 01:44:36.926 10168-10240/com.example.uitestdemo I/TestRxActivity: accept: Value is2  1
2020-03-27 01:44:36.927 10168-10240/com.example.uitestdemo I/TestRxActivity: accept: Value is2  2
2020-03-27 01:44:36.927 10168-10240/com.example.uitestdemo I/TestRxActivity: accept: Value is2  3
2020-03-27 01:44:36.928 10168-10240/com.example.uitestdemo I/TestRxActivity: accept: Value is2  4
2020-03-27 01:44:37.035 10168-10241/com.example.uitestdemo I/TestRxActivity: accept: Value is3  0
2020-03-27 01:44:37.037 10168-10241/com.example.uitestdemo I/TestRxActivity: accept: Value is3  1
2020-03-27 01:44:37.038 10168-10241/com.example.uitestdemo I/TestRxActivity: accept: Value is3  2
2020-03-27 01:44:37.038 10168-10241/com.example.uitestdemo I/TestRxActivity: accept: Value is3  3
2020-03-27 01:44:37.039 10168-10241/com.example.uitestdemo I/TestRxActivity: accept: Value is3  4

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容