RxJava2 入门详细笔记(2)

六、过滤操作符

6.1、filter()

通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送

        Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 2
10-06 07:57:48.196 12753-12753/? E/MainActivity: accept : 4

6.2、ofType()

过滤不符合该类型的事件

        Observable.just(1, 2, "Hi", 3, 4, "Hello").ofType(Integer.class).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 07:59:41.265 12857-12857/leavesc.hello.rxjavademo E/MainActivity: accept : 4

6.3、skip()

以正序跳过指定数量的事件

        Observable.just(1, 2, 3, 4).skip(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:01:09.183 12971-12971/leavesc.hello.rxjavademo E/MainActivity: accept : 4

6.4、skipLast()

以反序跳过指定数量的事件

        Observable.just(1, 2, 3, 4).skipLast(2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:02:00.753 13079-13079/leavesc.hello.rxjavademo E/MainActivity: accept : 2

6.5、distinct()

过滤事件序列中的重复事件

        Observable.just(1, 2, 1, 2, 3, 4, 3).distinct().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:03:27.402 13189-13189/leavesc.hello.rxjavademo E/MainActivity: accept : 4

6.6、distinctUntilChanged()

过滤掉连续重复的事件

        Observable.just(1, 2, 2, 1, 3, 4, 3, 3).distinctUntilChanged().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:04:44.531 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 2
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 1
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 4
10-06 08:04:44.541 13294-13294/leavesc.hello.rxjavademo E/MainActivity: accept : 3

6.7、take()

控制观察者接收事件的数量

        Observable.just(1, 2, 2, 1, 3, 4, 3, 3).take(3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 1
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2
10-06 08:05:43.520 13397-13397/? E/MainActivity: accept : 2

6.8、debounce()

如果两个事件发送的时间间隔小于设定的时间间隔,则前一件事件不会发送给观察者

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                Thread.sleep(900);
                emitter.onNext(2);
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
10-06 08:08:59.337 13509-13523/leavesc.hello.rxjavademo E/MainActivity: accept : 2

6.9、firstElement() && lastElement()

firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素

        Observable.just(1, 2, 3, 4, 5).firstElement().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });

6.10、elementAt() & elementAtOrError()

elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会触发任何调用,想触发异常信息的话就用 elementAtOrError()

        Observable.just(1, 2, 3, 4, 5).elementAt(5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });

以上代码不会触发任何

改用为 elementAtOrError(),则会抛出异常

        Observable.just(1, 2, 3, 4, 5).elementAtOrError(5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept : " + integer);
            }
        });
Process: leavesc.hello.rxjavademo, PID: 13948
    io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | null
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
        at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:46)
        at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115)
        at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:111)
        at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:37)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
        at io.reactivex.Single.subscribe(Single.java:3438)
        at io.reactivex.Single.subscribe(Single.java:3424)

七、条件操作符

7.1、all()

判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false

        Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.e(TAG, "accept: " + aBoolean);
            }
        });
10-06 08:16:10.212 14043-14043/leavesc.hello.rxjavademo E/MainActivity: accept: false

7.2、takeWhile()

发射原始 Observable,直到指定的某个条件不成立的那一刻,它停止发射原始 Observable,并终止自己的 Observable

Observable.just(1, 2, 3, 4, 5, 1, 2).takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 4;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: " + integer);
            }
        });
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 1
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 2
10-06 14:03:42.110 20095-20095/leavesc.hello.rxjavademo E/MainActivity: accept: 3

7.3、skipWhile()

订阅原始的 Observable,但是忽略它的发射物,直到指定的某个条件变为 false 时才开始发射原始 Observable

            Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
                .skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 3;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "integer " + integer);
                    }
                });
10-06 13:59:40.583 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 3
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 4
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 1
10-06 13:59:40.593 19764-19764/leavesc.hello.rxjavademo E/MainActivity: integer 5

7.4、takeUntil()

用于设置一个条件,当事件满足此条件时,此事件会被发送,但之后的事件就不会被发送了

Observable.just(1, 2, 4, 1, 3, 4, 5, 1, 5)
                .takeUntil(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer > 3;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "integer " + integer);
                    }
                });
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 1
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 2
10-06 08:54:24.833 17208-17208/? E/MainActivity: integer 4

7.5、skipUntil()

skipUntil() 中的 Observable 发送事件了,原始的 Observable 才会发送事件给观察者

Observable.intervalRange(1, 6, 0, 1, TimeUnit.SECONDS)
                .skipUntil(Observable.intervalRange(10, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long along) {
                        Log.e(TAG, "onNext : " + along);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete");
                    }
                });
10-06 08:51:16.926 16877-16877/leavesc.hello.rxjavademo E/MainActivity: onSubscribe
10-06 08:51:17.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 2
10-06 08:51:18.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 3
10-06 08:51:19.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 4
10-06 08:51:20.936 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 5
10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onNext : 6
10-06 08:51:21.946 16877-16892/leavesc.hello.rxjavademo E/MainActivity: onComplete

7.6、sequenceEqual()

判断两个 Observable 发送的事件是否相同,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射 true,否则发射 false

        Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        Log.e(TAG, "accept aBoolean : " + aBoolean);
                    }
                });
10-06 08:46:59.369 16492-16492/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true

7.7、contains()

判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false

        Observable.just(1, 2, 3, 4).contains(2).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.e(TAG, "accept aBoolean : " + aBoolean);
            }
        });
10-06 08:45:58.100 16386-16386/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean : true

7.8、isEmpty()

判断事件序列是否为空

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onComplete();
            }
        }).isEmpty().subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.e(TAG, "accept aBoolean: " + aBoolean);
            }
        });
10-06 08:43:43.201 16278-16278/leavesc.hello.rxjavademo E/MainActivity: accept aBoolean: true

7.9、amb()

amb() 接收一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,不管发射的是一项数据还是一个 onErroronCompleted 通知,其余 Observable 将会被丢弃

        List<Observable<Long>> list = new ArrayList<>();
        list.add(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS));
        list.add(Observable.intervalRange(10, 3, 0, 1, TimeUnit.SECONDS));
        Observable.amb(list).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "accept: " + aLong);
            }
        });
10-06 08:41:45.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 10
10-06 08:41:46.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 11
10-06 08:41:47.783 16053-16068/leavesc.hello.rxjavademo E/MainActivity: accept: 12

7.10、defaultIfEmpty()

如果 Observable 没有发射任何值,则可以利用这个方法发送一个默认值

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onComplete();
            }
        }).defaultIfEmpty(100).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: " + integer);
            }
        });
10-06 08:40:04.754 15945-15945/leavesc.hello.rxjavademo E/MainActivity: accept: 100
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容