Rxjava常用apiDemo

创建类 ##转换类 ##过滤类##时间类##截取类。

create
from fromArray(数组) fromIterator(集合)
just(1,2,3...)("1","2","3"...)
补充:

compose 去除重复代码。
onTerminateDetach()防止内存泄漏。
关闭轮询timeUntil(Observable.timer(delay long,TimeUnit.xxx))

1.map 变换

 //1.map,变换,讲事件序列中的事件转换为另外一个事件。
    private void map(){

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1111);
                e.onNext(2222);
                e.onNext(3333);
            }
        }).map(new Function<Integer,String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is a map demo,id= "+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

2.zip 合并 可变参数,和最短的相同。

 //2.zip 合并。
    private static void zip(){

        Observable.zip(getIntegerObservable(), getStringObservable(), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

private static Observable<Integer> getIntegerObservable(){

       return  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1000001);
                e.onNext(1000002);
                e.onNext(1000003);
                e.onNext(1000004);
            }
        });
    }

    private static Observable<String> getStringObservable(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("第一个String");
                e.onNext("第二个String");
                e.onNext("第三个String");
            }
        });

    }
image.png

3.concat 连接

private static void concat(){

        Observable.concat(Observable.just(1,2,3,4),Observable.just(6,7,8))
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "this is concat demo,id = "+integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
    }
image.png

4.flatmap flat:平摊。将observable分摊成多个observable,再装入一个observable中,激活这个observable。

他是无序的

    private static void flatmap(){

        String[] strs = {"尖子生","语文","英语","数学"};
        String[] strs2 = {"体育生","长跑","短跑","跳远","田径"};
        String[] strs3 = {"学渣","睡觉"};
        String[] strs4 = {"学神","睡觉","学习","玩"};
        Student[] students = {new Student("小明",15,strs),new Student("仍物线",40,strs2)
                ,new Student("冬冬",26,strs3),new Student("宝贝",5,strs4)};
        Observable.fromArray(students).flatMap(new Function<Student, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Student student) throws Exception {
                return Observable.fromArray(student.getSource());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.print(s);
            }
        });
    }

image.png

5.concatmap。功能与flatmap相同,唯一不同是保证了顺序。代码就不贴了。

6distinct 过滤 过滤重复,底层hashSet(),没有调用onNext,有不添加不调用。

//distinct 过滤
    private static void distinct(){
        Observable.just("a","b","c","d","a","e","d","d")
                .distinct()
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.print(s);
                    }
                });
    }
//abcde

7.filter 过滤 自定义规则。

//过滤
    private static void filter(){
        Observable.just(1,2,3,4,12,3,4123,12,3423,1).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer>=5;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer+"");
            }
        });
    }
//结果
12
4123
12
3423

8.buffer buffer(count,skip),一次事件中有count个参数,按照skip进行选区。

private static void buffer(){
        Observable.just(1,2,3,4,5,6)
                .buffer(3,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println(integers.size());
                        for(Integer size:integers){
                            System.out.print(String.valueOf(size));
                        }
                    }
                });
    }
//打印结果。
3
1233
2343
3453
4562
561
6

9.timer 计时器 默认就是在新的线程操作,在android记得切回来,会自己创建一个Observable

private static void timer(){
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                        long time = date.getTime();
                        System.out.println(String.valueOf(time));
                    }
                });
    }

interval 创建一个Observable,延迟多久后发送消息,每隔几秒发送一次。

注意:比如在activity中做,和handler一样,页面销毁了,他还没停止,记得手动关闭,

private static void interval(){
        Disposable subscribe = Observable.interval(2,2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        
                    }
                });
    }
  
if(subscribe!= null &&subscribe.isDisposed()){
            subscribe.dispose();
        }

doOnNext 在发射后consumer消费之前做的事。传入Consumer()。不能对数据修改。

skip,传入count,跳过count个事件。

take,传入count,只接受count个事件。

Single。类似observable类。接受一个参数 .subscribe(singleObserver) singleObserver只会调用

onError或onComplete

  private static void single(){
        Single.just(1)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        
                    }
                });
    }

debounce 去抖,一个事件规定时间内有第二次传入,第一次取消,执行第二次。可以做搜索,防止button短时间点击多次。

和sample的区别,debounce是针对一个一个的事件,sample针对的一个一个的时间段。

 private static void debounce(){
       Observable.create(new ObservableOnSubscribe<String>() {
           @Override
           public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("11111");
            Thread.sleep(300);
            e.onNext("22222");
            Thread.sleep(502);
            e.onNext("33333");
            Thread.sleep(502);
            e.onNext("44444");
            Thread.sleep(499);
           }
       }).debounce(500,TimeUnit.MILLISECONDS)
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(String s) throws Exception {
                       System.out.println(s);
                   }
               });
    }

defer 创建类操作符,只有被订阅才创建observable。有一个延时效果在里面。

public static void defer(){

        Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });
        defer.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer o) throws Exception {
                System.out.println(o+"");
            }
        });
    }
1
2
3

last 只发出最后一个,或者符合条件的最后一个


merge 和concat类似,不过是不按顺序的。

reduce 操作事件,统一返回一个结果。

public static void reduce(){
        Observable.just(1,2,3,5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer-integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer+"");
                    }
                });

    }
-9

scan 与reduce类似,不过每次都会调用onNext 把每一步的结果返回。

window 将一个事件序列,按照参数分成若干个事件序列。

mRxOperatorsText.append("window\n");
       Log.e(TAG, "window\n");
       Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
               .take(15) // 最多接收15个
               .window(3, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<Observable<Long>>() {
                   @Override
                   public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                       mRxOperatorsText.append("Sub Divide begin...\n");
                       Log.e(TAG, "Sub Divide begin...\n");
                       longObservable.subscribeOn(Schedulers.io())
                               .observeOn(AndroidSchedulers.mainThread())
                               .subscribe(new Consumer<Long>() {
                                   @Override
                                   public void accept(@NonNull Long aLong) throws Exception {
                                       mRxOperatorsText.append("Next:" + aLong + "\n");
                                       Log.e(TAG, "Next:" + aLong + "\n");
                                   }
                               });
                   }
               });

repeat 发送多少次

public static void repeat(){
        Observable.range(5,10).repeat(20)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer).intern();
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

    }
//从5开始发送到14 重复20次。

startWith 事件前加入指定事件

public static void startWith(){

        Observable.range(5,5).startWith(0)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.print(integer+"");
                    }
                });

    }
056789

delay 延迟几秒发射。#timeout,超时后发送自己的。

do系列

doOnNext()
、doAfterNext()
、doFinally()completed或者error时触发
,doOnSubscribe()在订阅时触发,一般用来初始化。

错误操作符

catch

onErrorReturn 在错误触发时,返回一个特殊的项替代错误,不会传递给观察者。
onErrorResumeNext 发生错误发送备用observable给观察者。

public static void catch1(){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onError(null);
                e.onNext("111");
            }
        }).onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                return "发生错误了,而你不知道";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.print(s);
            }
        });
    }
发生错误了,而你不知道
#retry(long time) 重试  指定次数。

#toFlowable 将observable转成Flowable




最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容