RxJava入门与提高-操作符篇(3)

前言

按照官方的分类,操作符大致分为以下几种:

  • Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
  • Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
  • Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
  • Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
  • Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
  • Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
  • Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
  • Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
  • 其他如observable.toList()、observable.connect()、observable.publish()等等;

1、创建型操作符

  • create操作符

create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的

  • from操作符

from操作符是把其他类型的对象和数据类型转化成Observable

  • just操作符

just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别

  • defer操作符

defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行

  • timer操作符

timer操作符是创建一串连续的数字,产生这些数字的时间间隔是一定的

  • interval操作符

interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大;interval操作符的实现效果跟上面的timer操作符的第二种情形一样

  • range操作符

range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字

  • repeat/repeatWhen操作符

repeat操作符是对某一个Observable,重复产生多次结果

repeatWhen操作符是对某一个Observable,有条件地重新订阅从而产生多次结果


2、Observable的转换操作符

  • buffer

buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始

  • flatmap

把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者

  • concatMap操作符

flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况

  • switchMap

与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果

  • groupBy操作符

groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值

  • cast操作符

而cast操作符主要是做类型转换的

  • scan操作符

scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者

  • window操作符

window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理


3、Observable的过滤操作符

  • debounce操作符

debounce操作符对源Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。

  • distinct操作符

distinct操作符对源Observable产生的结果进行过滤,把重复的结果过滤掉,只输出不重复的结果给订阅者,非常类似于SQL里的distinct关键字。

  • elementAt操作符

elementAt操作符在源Observable产生的结果中,仅仅把指定索引的结果提交给订阅者,索引是从0开始的

  • filter操作符

filter操作符是对源Observable产生的结果按照指定条件进行过滤,只有满足条件的结果才会提交给订阅者

  • ofType操作符

ofType操作符类似于filter操作符,区别在于ofType操作符是按照类型对结果进行过滤

  • first操作符

first操作符是把源Observable产生的结果的第一个提交给订阅者,first操作符可以使用elementAt(0)和take(1)替代

  • single操作符

single操作符是对源Observable的结果进行判断,如果产生的结果满足指定条件的数量不为1,则抛出异常,否则把满足条件的结果提交给订阅者

  • last操作符

last操作符把源Observable产生的结果的最后一个提交给订阅者,last操作符可以使用takeLast(1)替代

  • ignoreElements操作符

ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知

  • skip操作符

skip操作符针对源Observable产生的结果,跳过前面n个不进行处理,而把后面的结果提交给订阅者处理

  • skipLast操作符

skipLast操作符针对源Observable产生的结果,忽略Observable最后产生的n个结果,而把前面产生的结果提交给订阅者处理,

  • take操作符

take操作符是把源Observable产生的结果,提取前面的n个提交给订阅者,而忽略后面的结果

  • takeFirst操作符

takeFirst操作符类似于take操作符,同时也类似于first操作符,都是获取源Observable产生的结果列表中符合指定条件的前一个或多个,与first操作符不同的是,first操作符如果获取不到数据,则会抛出NoSuchElementException异常,而takeFirst则会返回一个空的Observable,该Observable只有onCompleted通知而没有onNext通知。

  • takeLast操作符

takeLast操作符是把源Observable产生的结果的后n项提交给订阅者


4、Observable的组合操作符

  • combineLatest操作符

combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable。这两个Observable中任意一个Observable产生的结果,都和另一个Observable最后产生的结果,按照一定的规则进行合并

  • join操作符

join操作符把类似于combineLatest操作符,也是两个Observable产生的结果进行合并,合并的结果组成一个新的Observable,但是join操作符可以控制每个Observable产生结果的生命周期

  • groupJoin操作符

groupJoin操作符非常类似于join操作符,区别在于join操作符中第四个参数的传入函数不一致

  • merge操作符

merge操作符是按照两个Observable提交结果的时间顺序,对Observable进行合并

  • mergeDelayError操作符

从merge操作符的流程图可以看出,一旦合并的某一个Observable中出现错误,就会马上停止合并,并对订阅者回调执行onError方法,而mergeDelayError操作符会把错误放到所有结果都合并完成之后才执行

  • startWith操作符

startWith操作符是在源Observable提交结果之前,插入指定的某些数据

  • switchOnNext操作符

switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者

  • zip操作符

zip操作符是把两个observable提交的结果,严格按照顺序进行合并,其流程图如下:


5、 Observable的错误处理操作符

  • onErrorReturn操作符

onErrorReturn操作符是在Observable发生错误或异常的时候(即将回调oError方法时),拦截错误并执行指定的逻辑,返回一个跟源Observable相同类型的结果,最后回调订阅者的onComplete方法

  • onErrorResumeNext操作符

onErrorResumeNext操作符跟onErrorReturn类似,只不过onErrorReturn只能在错误或异常发生时只返回一个和源Observable相同类型的结果,而onErrorResumeNext操作符是在错误或异常发生时返回一个Observable,也就是说可以返回多个和源Observable相同类型的结果

  • onExceptionResumeNext操作符

onExceptionResumeNext操作符和onErrorResumeNext操作符类似,不同的地方在于onErrorResumeNext操作符是当Observable发生错误或异常时触发,而onExceptionResumeNext是当Observable发生异常时才触发

  • retry操作符

retry操作符是当Observable发生错误或者异常时,重新尝试执行Observable的逻辑,如果经过n次重新尝试执行后仍然出现错误或者异常,则最后回调执行onError方法;当然如果源Observable没有错误或者异常出现,则按照正常流程执行




常用操作符(举例学习)

(1)Observable.from()。

使用from( )创建Observable,遍历集合,发送每个item:

List list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
Observable fromObservable = Observable.from(list);  //遍历list 每次发送一个

关于创建的操作符,我在前边的文章里已经总结过,这里不再列举。有兴趣 的同学可以参考本系列第一篇文章 RxJava入门与提高(1)

(2)Observable.map()

用来把一个事件转换为另一个事件。
map()操作符就是用于变换Observable对象的,map操作符返回一个Observable对象,这样就可以实现链式调用,在一个Observable对象上多次使用map操作符,最终将最简洁的数据传递给Subscriber对象。

特性:

它不必返回Observable对象返回的类型,你可以使用map操作符返回一个发出新的数据类型的observable对象。
可以对一个Observable多次使用map
用一个例子来练习:

//刚创建的Observable是String类型的
Observable.just("Hellp Map Operator")
.map(new Func1<String, Integer>() {
    @Override
    public Integer call(String s) {
        return 2015;//通过第一个map转成Integer
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return String.valueOf(integer);//再通过第二个map转成String
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
});

Run起来输出日志: 2015

(3)Observable.flatMap()

Observable.flatMap()接收一个Observable的输出作为参数输入,同时输出另外一个Observable。这一点很类似于map()。稍后总结flatMap()与map()。

举例说明

 List<String> list = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift");

 //注意这里的Func1的参数List<String>是 .just(list)返回的Observable的输出,并且返会一个Observable<String>
Observable.just(list)
.flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> strings) {
        //结合from处理
        return Observable.from(strings);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("_flatMap:"+s);
    }
});



日志:

_flatMap:Java
_flatMap:Android
_flatMap:Ruby
_flatMap:Ios
_flatMap:Swift

假设这时候我们需要处理一下所获取的结果,我们加个前缀,在保证不修改subscriber的前提下我们可以这么做:
增加个函数,用来增加个前缀:

static Observable<String>addPre(String lan){
        return Observable.just("addPre_"+lan);
}
Observable.just(list)
.flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> strings) {
        return Observable.from(strings);
    }
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String s) {
        //我们在这里调用`addPre`方法,就行处理
        return addPre(s);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
});

输出日志

addPre_Java
addPre_Android
addPre_Ruby
addPre_Ios
addPre_Swift
  • 小结 :flatMap()与map()
    (1)flatMap()与map()变换操作完后,都是返回的Observable对象,即数据源,这样可以继续发射数据,或者调用subscribe去叫接收员接收数据。
    (2)map()中的Func类重写的的call()方法的入参是 转换前的Observable对象 发射的数据内容(可以理解为Observable对象里边包含的数据内容),返回的数据是转换后的Observable对象要发射的数据。
    flatMap()中的Func类重写的的call()方法的入参也是 转换前的Observable对象 发射的数据内容,返回的数据是Observable对象。
    代码片段
.map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return String.valueOf(integer);//再通过第二个map转成String,返回String
    }
})
*******************************************************
.flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String s) {
        //我们在这里调用`addPre`方法,返回的是Observable<String>
        return addPre(s);
    }
}

(3)flatMap()处理集合、数组等,map()处理单一对象数据。

(4)Buffer

Buffer操作符定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。

       RxView.clickEvents(mButton)
                .buffer(2, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<List<ViewClickEvent>>() {
                    @Override
                    public void onCompleted() {}
                    @Override
                    public void onError(Throwable e) {}
                    @Override
                    public void onNext(List<ViewClickEvent> viewClickEvents) {
                        if (viewClickEvents.size() > 0) {
                            Toast.makeText(MainActivity.this, "2秒内点击了" + viewClickEvents.size() + "次", Toast.LENGTH_SHORT).show();
                        } else {

                        }
                    }
                });

如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

  • 再举个栗子
    将原发射出来的数据已count为单元打包之后在分别发射出来
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    .buffer(3)
    .subscribe(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext--> " + o);
        }
    }, new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            System.out.println("onError--> " + throwable.getMessage());
        }
    }, new Action0() {
        @Override
        public void call() {
            System.out.println("onComplete");
        }
    });
日志:
onNext--> [1, 2, 3]
onNext--> [4, 5, 6]
onNext--> [7, 8, 9]
onNext--> [10]
onComplete

GroupBy

GroupBy操作符将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发射其所包含的的数据。

               Observable.just(1, 2, 3, 4, 5, 6)
                .groupBy(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
                    @Override
                    public void call(final GroupedObservable<Boolean, Integer> observable) {
                        //toList方法转换为Observable<List<T>>
                        observable.toList().subscribe(new Action1<List<Integer>>() {
                            @Override
                            public void call(List<Integer> integers) {
                                Log.d(TAG, "key=" + observable.getKey() + ",values=" + integers);
                                //key=false,values=[1, 3, 5]
                                //key=true,values=[2, 4, 6]
                            }
                        });
                    }
                });

Filter

Filter返回满足过滤条件的数据。

        Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 5;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1,2,3,4

                    }
                });

First

First操作符返回第一条数据或者返回满足条件的第一条数据。

 Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .first()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1 返回第一条数据 

                    }
                });

Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 3;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //4 返回满足条件的第一条数据

                    }
                });

Last

Last操作符返回最后一条数据或者满足条件的最后一条数据。

Skip

Skip操作符将源Observable发射的数据过滤掉前n项。

    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .skip(6)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //7,8,9
                    }
                });

Take

Take操作符只取前n项。

        Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .take(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1,2
                    }
                });
  • 说明:根据我的学习和研究,个人觉得filter、first、等操作符与map的用法类似,感觉都可以用map()自己封装。

本文暂时总结这些,如果以后有总结,在更新。

关于线程控制,放在下篇讲解。


欢迎继续收看:RxJava入门与提高-线程控制Scheduler篇(4)

作者:ZhangYushui
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容