Rxjava操作符

创建型

  1. create

create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的.

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        for (int i = 1; i < 5; i++) {
                            subscriber.onNext(i);
                        }
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,"onNext : "+integer);
            }
        });
    }
  1. from

from操作符是把其他类型的对象和数据类型转化成Observable

 String[] strs={"aa","bb","cc","dd"};
        Observable.from(strs).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(String str) {
                Log.e(TAG,"onNext : "+str);
            }
        });
  1. just

just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别

 Observable.just(1,"a",'b',"%").subscribe(new Subscriber<Serializable>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG,e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                Log.e(TAG,"onNext : "+serializable);
            }
        });
  1. defer

defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的.

    i=10;
    Observable justObservable = Observable.just(i);
    i=12;
    Observable deferObservable = Observable.defer(new Func0<Observable<Integer>>() {
        @Override
        public Observable<Integer> call() {
            return Observable.just(i);
        }
    });
    i=15;

    justObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer o) {
            Log.e(TAG,"just i="+o);
        }
    });

    deferObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer o) {
            Log.e(TAG,"defer i="+o);
        }
    });
    
    
=====>: ----------defer-----------
=====>: just i=10
=====>: defer i=15
  1. timer

timer操作符是隔一段时间产生一个数字,然后就结束,可以理解为延迟产生数字,也可以用来延迟执行动作

timer操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程.

 Observable.timer(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            Log.e(TAG,"timer : "+aLong);
        }
    });
  1. interval

interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大,也可以通过改变参数使其在一段时间内递增。默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程.

Observable.interval(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.e(TAG,"interval : "+aLong);
            }
        });
  1. range

range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字

Observable.range(4,5).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,"range : "+integer );
            }
        });
  1. repeat

repeat操作符是对某一个Observable,重复产生多次结果

和其他操作符一起使用的

//连续产生两组(3,4,5)的数字
        Observable.range(3,3).repeat(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG,"onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error:" + e.getMessage());
            }

            @Override
            public void onNext(Integer i) {
                Log.e(TAG,"repeat : "+i );
            }
        });

变换

  1. buffer

buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅者,订阅者处理后,清空buffer列表,同时接收下一次收集的结果并提交给订阅者,周而复始。

一旦源Observable在产生结果的过程中出现异常,即使buffer已经存在收集到的结果,订阅者也会马上收到这个异常,并结束整个过程。

 final String[] mails = new String[]{"Here is an email!", "Another email!", "Yet another email!"};
    //每隔1秒就随机发布一封邮件
    Observable<String> endlessMail = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                if (subscriber.isUnsubscribed()) return;
                Random random = new Random();
                while (true) {
                    String mail = mails[random.nextInt(mails.length)];
                    subscriber.onNext(mail);
                    Thread.sleep(1000);
                }
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }
    }).subscribeOn(Schedulers.io());
    //把上面产生的邮件内容缓存到列表中,并每隔3秒通知订阅者
    endlessMail.buffer(3, TimeUnit.SECONDS).subscribe(new Action1<List<String>>() {
        @Override
        public void call(List<String> list) {
            Log.e(TAG,String.format("You've got %d new messages!  Here they are!", list.size()));
            for (int i = 0; i < list.size(); i++)
                Log.e(TAG,"**" + list.get(i).toString());
        }
    });
  1. map

map操作符是把源Observable产生的结果,通过映射规则转换成另一个结果集,并提交给订阅者进行处理。

 Observable.just(1,2,3,4,5,6).map(new Func1<Integer, Integer>() {
        @Override
        public Integer call(Integer integer) {
            //对源Observable产生的结果,都统一乘以3处理
            return integer*3;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"next:" + integer);
        }
    });
  1. faltMap

flatMap操作符是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

flatMap操作符通过传入一个函数作为参数转换源Observable,在这个函数中,你可以自定义转换规则,最后在这个函数中返回一个新的Observable,然后flatMap操作符通过合并这些Observable结果成一个Observable,并依次提交结果给订阅者。

flatMap操作符在合并Observable结果时,有可能存在交叉的情况

  1. contactMap

cancatMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

与flatMap操作符不同的是,concatMap操作符在处理产生的Observable时,采用的是“连接(concat)”的方式,而不是“合并(merge)”的方式,这就能保证产生结果的顺序性,也就是说提交给订阅者的结果是按照顺序提交的,不会存在交叉的情况。

  1. switchMap

switchMap操作符与flatMap操作符类似,都是把Observable产生的结果转换成多个Observable,然后把这多个Observable“扁平化”成一个Observable,并依次提交产生的结果给订阅者。

与flatMap操作符不同的是,switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果,举个例子来说,比如源Observable产生A、B、C三个结果,通过switchMap的自定义映射规则,映射后应该会产生A1、A2、B1、B2、C1、C2,但是在产生B2的同时,C1已经产生了,这样最后的结果就变成A1、A2、B1、C1、C2,B2被舍弃掉了!

//flatMap操作符的运行结果
    Observable.just(10, 20, 30).flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;
            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"flatMap Next:" + integer);
        }
    });

    //concatMap操作符的运行结果
    Observable.just(10, 20, 30).concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;

            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"concatMap Next:" + integer);
        }
    });

    //switchMap操作符的运行结果
    Observable.just(10, 20, 30).switchMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            //10的延迟执行时间为200毫秒、20和30的延迟执行时间为180毫秒
            int delay = 200;
            if (integer > 10)
                delay = 180;

            return Observable.from(new Integer[]{integer, integer / 2}).delay(delay, TimeUnit.MILLISECONDS);
        }
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.e(TAG,"switchMap Next:" + integer);
        }
    });
    
    

结果

=====>: ------flatMap、concatMap、switchMap-----
=====>: flatMap Next:30
=====>: flatMap Next:20
=====>: flatMap Next:10
=====>: flatMap Next:15
=====>: flatMap Next:10
=====>: flatMap Next:5

=====>: switchMap Next:30
=====>: switchMap Next:15

=====>: concatMap Next:10
=====>: concatMap Next:5
=====>: concatMap Next:20
=====>: concatMap Next:10
=====>: concatMap Next:30
=====>: concatMap Next:15
  1. GroupBy

groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。

值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。

  1. cast

cast操作符类似于map操作符,不同的地方在于map操作符可以通过自定义规则,把一个值A1变成另一个值A2,A1和A2的类型可以一样也可以不一样;而cast操作符主要是做类型转换的,传入参数为类型class,如果源Observable产生的结果不能转成指定的class,则会抛出ClassCastException运行时异常。

  1. scan

scan操作符通过遍历源Observable产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。

  1. window

window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理。

过滤

  1. debounce
  1. distinct
  2. elementAt
  3. filter
  4. ofType
  5. first
  6. single
  7. last
  8. ignoreElements
  9. sample
  10. skip
  11. skipLast
  12. take
  13. takeFirst
  14. takeLast

组合操作符

  1. merge
  2. zip
  3. join
  4. combineLatest
  5. and/when/then
  6. switch
  7. startSwitch

本文为作者学习用,内容抄自RxJava操作符
更多操作符看RxJava docs

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,983评论 6 537
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,772评论 3 422
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,947评论 0 381
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,201评论 1 315
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,960评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,350评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,406评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,549评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,104评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,914评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,089评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,647评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,340评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,753评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,007评论 1 289
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,834评论 3 395
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,106评论 2 375

推荐阅读更多精彩内容

  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,850评论 0 1
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,747评论 8 93
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,886评论 0 10
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 954评论 0 3
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,212评论 2 8