呕心沥血:RxJava2.x创建操作符

RxJava的基本流程以及线程切换可以参考之前的文章

https://www.jianshu.com/p/2adaea7237c4

1、序言

RxJava除了拥有逻辑简洁的事件流链式调用,使用简单外其丰富的操作符基本可以满足日常开发中的各种实现逻辑

Rx的基本操作符分类

RxJava操作符分类.jpg

下面会逐一讲解每一类操作符的使用

2、创建操作符

RxJava创建操作符.jpg

2.1、基本创建操作符

create作为RxJava最基本的创建操作,用来完整的创建一个被观察者Observable对象

通过create创建一个被观察Observable对象

 Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                //重写subscribe 写入实际的代码逻辑
                if (!e.isDisposed()) {
                    e.onNext("RxText");
                }
            }
        });

创建观察者Observer对象

Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {
                LogUtils.showLog("message == " + (String) o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

通过subscribe进行关联

observable.subscribe(observer);

在实际使用时一般链式调用

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("test");
                }
            }
        })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        LogUtils.showLog("s == " + s);
                    }

                    @Override
                    public void onError(Throwable e) {
                       
                    }

                    @Override
                    public void onComplete() {
                      
                    }
                });

2.2、快速创建操作符

使用场景,快速的创建被观察者并进行数据发送

1、just()

作用:快速创建1个被观察者对象Observable

注意:just只能传入最多10个数据

Observable.just(1, 2, 3, 4, 5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.showLog("integer == " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

输出:

D/hzfTag1204: integer == 1
    integer == 2
    integer == 3
    integer == 4
    integer == 5

2、fromArray()

作用:快速创建一个被观察者,直接发送传入的数组数据,当发送数据大于10时可以考虑采用fromArray

 int[] arrays = {1, 2, 3, 4, 5};
        Observable.fromArray(arrays)
                .subscribe(new Observer<int[]>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(int[] ints) {
                         //遍历数组并输出
                        for (int num : ints) {
                            LogUtils.showLog("num == " + num);
                        }
                       
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

3、 fromIterable

作用:快速创建一个Observable并将集合当中的数据发送

fromIterable的使用与fromArray一致,数据由fromArray的数组改为集合,不做具体的赘述了

BUT!!! 我在做测试的时候发现了一个好玩儿的事情.....

fromArray中也可以传一个list;但是fromIterable不能传数组

fromIterable不能传数组根据代码很明显,其对参数做了限制

fromIterable参数.png

但fromArray没有做限制,当我用以下代码操作时可以正确拿到list中的数据

        List list = new ArrayList();
        list.add("1");
        list.add("2");
        list.add("3");
        Observable.fromArray(list)
                .subscribe(new Observer<List>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(List ints) {
                        LogUtils.showLog("ints == "+ints.get(2));
                    }

                    @Override
                    public void onError(Throwable e) {
                        LogUtils.showLog("exception == "+e.getMessage().toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });



fromArray传集合.png

通过断点分析源码fromArray当中的逻辑,当fromArray传入一个数组,会走到item.length == 1这个判断当中,并且最终走的是just操作符


fromArray源码.png

所以通过fromArray传入集合就相当于是just(list);但是为什么会走到items.length == 1这个判断当中?
这里面涉及到Java可变参数的概念,fromArray后面参数是T...意思是参数不确定,可以传多个参数,传入几个参数这个items的length就是多少,所以fromArray不论传的是list还是array,只要传的是一个参数,最终都相当于通过just将数据发送出去了(相当于把对象通过just发出去);当fromArray中的参数大于1时,会将参数封装成为一个T[]数组,再将数组中的每一个元素逐一发送。


ObservableFromArray.png

4、never
作用:只会调用onSubscribe方法,不会回调onNext onError onComplete等回调方法;通过源码可以看出,内部的subscribeActual方法只是调用了onSubscribe,并没有执行其他的回调方法

Rxjava never操作符.png

5、empty
作用:当订阅后,被观察者只会发送一个onComplete事件,最终Observer的回调只有onSubscribe和onComplete会执行

6、error
作用:订阅后仅发送Error事件,error的参数可以自定义异常发送给onError

2.3、延迟创建操作符

延迟创建操作符的需求场景:
(1)当经过n秒后,执行操作x
(2)每经过n秒,周期的执行操作x
延迟创建操作符的分类:


RxJava延迟操作符.jpg

1、timer
作用:快速创建一个Observable,并指定一段时间后发送onNext(0)事件;onNext的参数为long类型,默认数值为0

final long startTime = System.currentTimeMillis();
        Observable.timer(5,TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog((System.currentTimeMillis() - startTime) + " ms后接收到了数据 " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

12-05 10:07:11.351 12187-12207/com.hzf.test.myapplication D/hzfTag1205: 5005 ms后接收到了数据 0

2、defer
作用:等到实际subscribe订阅时才会创建一个Observable;可以保证Observable的数据在订阅时是最新的数据

 Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just(test1);
            }
        });

        test1 = "test222";

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                LogUtils.showLog("s == "+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

12-05 10:22:00.299 12308-12308/com.hzf.test.myapplication D/hzfTag1205: s == test222

通过defer的源码可以出来被观察者的创建是在subscribeActual订阅时
(ObservableDefer)


RxJava defer订阅.png

3、interval
作用:快速创建一个被观察者Observable对象,每隔指定时间发送一个事件
interval的参数最多可用的为4个参数
参数1:初始延迟发送事件的时间
参数2:间隔发送事件的时间
参数3:TimeUnit指定的时间类型
参数4:Scheduler,可以手动创建一个worker指定interval的运行线程(如果不手动选择第四个参数,默认interval发生在子线程)

Observable.interval(3, 5, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == "+aLong+",当前线程为 "+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

运行结果为:
12-05 11:17:32.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,当前线程为 RxComputationThreadPool-1
12-05 11:17:37.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,当前线程为 RxComputationThreadPool-1
12-05 11:17:42.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,当前线程为 RxComputationThreadPool-1
12-05 11:17:47.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,当前线程为 RxComputationThreadPool-1

也可以指定调度器,例如:

Observable.interval(3, 5, TimeUnit.SECONDS, new Scheduler() {
            @Override
            public Worker createWorker() {
                return AndroidSchedulers.mainThread().createWorker();
            }
        })
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == "+aLong+",当前线程为 "+Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

12-05 11:20:19.386 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,当前线程为 main
12-05 11:20:24.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,当前线程为 main
12-05 11:20:29.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,当前线程为 main
12-05 11:20:34.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,当前线程为 main

如果运用线程操作符的话,经过我的实验,当调用subscribeOn时是不起作用的,实际发生事件的线程依然是子线程或者指定的调度器;而调用observeOn时,onNext接收事件的线程即为observeOn所指定的线程。

2、intervalRange
作用:快读创建一个被观察者对象,每隔指定时间发送数据,但是与interval不同的是intervalRange可以指定发送的数据数量

参数1:起始的事件序号
参数2:事件数量
参数3:第1次事件延迟发送的时间
参数4:事件间的间隔时间
参数5:时间单位
参数6:Scheduler

Observable.intervalRange(3, 3, 3, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        LogUtils.showLog("aLong == " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

运行结果:
12-05 11:30:08.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 3
12-05 11:30:10.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 4
12-05 11:30:12.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 5

2、range
作用:快速创建一个被观察者,并连续发送一个事件序列,可指定范围。功能与intervalRange类似,但实现的功能会相对简单一些。

参数1:起始数据
参数2:发送多少条数据

//从5开始发送 发送10个数据
        Observable.range(5, 10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.showLog("integer == " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

2、rangeLong
作用:与range类似,但支持long类型

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容