Rxjava常用apiDemo

创建类 ##转换类 ##过滤类##时间类##截取类。

create
from fromArray(数组) fromIterator(集合)
just(1,2,3...)("1","2","3"...)
补充:

compose 去除重复代码。
onTerminateDetach()防止内存泄漏。
关闭轮询timeUntil(Observable.timer(delay long,TimeUnit.xxx))

1.map 变换

 //1.map,变换,讲事件序列中的事件转换为另外一个事件。
    private void map(){

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1111);
                e.onNext(2222);
                e.onNext(3333);
            }
        }).map(new Function<Integer,String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is a map demo,id= "+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

2.zip 合并 可变参数,和最短的相同。

 //2.zip 合并。
    private static void zip(){

        Observable.zip(getIntegerObservable(), getStringObservable(), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer+s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

private static Observable<Integer> getIntegerObservable(){

       return  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1000001);
                e.onNext(1000002);
                e.onNext(1000003);
                e.onNext(1000004);
            }
        });
    }

    private static Observable<String> getStringObservable(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("第一个String");
                e.onNext("第二个String");
                e.onNext("第三个String");
            }
        });

    }
image.png

3.concat 连接

private static void concat(){

        Observable.concat(Observable.just(1,2,3,4),Observable.just(6,7,8))
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "this is concat demo,id = "+integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
    }
image.png

4.flatmap flat:平摊。将observable分摊成多个observable,再装入一个observable中,激活这个observable。

他是无序的

    private static void flatmap(){

        String[] strs = {"尖子生","语文","英语","数学"};
        String[] strs2 = {"体育生","长跑","短跑","跳远","田径"};
        String[] strs3 = {"学渣","睡觉"};
        String[] strs4 = {"学神","睡觉","学习","玩"};
        Student[] students = {new Student("小明",15,strs),new Student("仍物线",40,strs2)
                ,new Student("冬冬",26,strs3),new Student("宝贝",5,strs4)};
        Observable.fromArray(students).flatMap(new Function<Student, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Student student) throws Exception {
                return Observable.fromArray(student.getSource());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.print(s);
            }
        });
    }

image.png

5.concatmap。功能与flatmap相同,唯一不同是保证了顺序。代码就不贴了。

6distinct 过滤 过滤重复,底层hashSet(),没有调用onNext,有不添加不调用。

//distinct 过滤
    private static void distinct(){
        Observable.just("a","b","c","d","a","e","d","d")
                .distinct()
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.print(s);
                    }
                });
    }
//abcde

7.filter 过滤 自定义规则。

//过滤
    private static void filter(){
        Observable.just(1,2,3,4,12,3,4123,12,3423,1).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer>=5;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer+"");
            }
        });
    }
//结果
12
4123
12
3423

8.buffer buffer(count,skip),一次事件中有count个参数,按照skip进行选区。

private static void buffer(){
        Observable.just(1,2,3,4,5,6)
                .buffer(3,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println(integers.size());
                        for(Integer size:integers){
                            System.out.print(String.valueOf(size));
                        }
                    }
                });
    }
//打印结果。
3
1233
2343
3453
4562
561
6

9.timer 计时器 默认就是在新的线程操作,在android记得切回来,会自己创建一个Observable

private static void timer(){
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                        long time = date.getTime();
                        System.out.println(String.valueOf(time));
                    }
                });
    }

interval 创建一个Observable,延迟多久后发送消息,每隔几秒发送一次。

注意:比如在activity中做,和handler一样,页面销毁了,他还没停止,记得手动关闭,

private static void interval(){
        Disposable subscribe = Observable.interval(2,2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        
                    }
                });
    }
  
if(subscribe!= null &&subscribe.isDisposed()){
            subscribe.dispose();
        }

doOnNext 在发射后consumer消费之前做的事。传入Consumer()。不能对数据修改。

skip,传入count,跳过count个事件。

take,传入count,只接受count个事件。

Single。类似observable类。接受一个参数 .subscribe(singleObserver) singleObserver只会调用

onError或onComplete

  private static void single(){
        Single.just(1)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        
                    }
                });
    }

debounce 去抖,一个事件规定时间内有第二次传入,第一次取消,执行第二次。可以做搜索,防止button短时间点击多次。

和sample的区别,debounce是针对一个一个的事件,sample针对的一个一个的时间段。

 private static void debounce(){
       Observable.create(new ObservableOnSubscribe<String>() {
           @Override
           public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("11111");
            Thread.sleep(300);
            e.onNext("22222");
            Thread.sleep(502);
            e.onNext("33333");
            Thread.sleep(502);
            e.onNext("44444");
            Thread.sleep(499);
           }
       }).debounce(500,TimeUnit.MILLISECONDS)
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(String s) throws Exception {
                       System.out.println(s);
                   }
               });
    }

defer 创建类操作符,只有被订阅才创建observable。有一个延时效果在里面。

public static void defer(){

        Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });
        defer.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer o) throws Exception {
                System.out.println(o+"");
            }
        });
    }
1
2
3

last 只发出最后一个,或者符合条件的最后一个


merge 和concat类似,不过是不按顺序的。

reduce 操作事件,统一返回一个结果。

public static void reduce(){
        Observable.just(1,2,3,5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer-integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer+"");
                    }
                });

    }
-9

scan 与reduce类似,不过每次都会调用onNext 把每一步的结果返回。

window 将一个事件序列,按照参数分成若干个事件序列。

mRxOperatorsText.append("window\n");
       Log.e(TAG, "window\n");
       Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
               .take(15) // 最多接收15个
               .window(3, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<Observable<Long>>() {
                   @Override
                   public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                       mRxOperatorsText.append("Sub Divide begin...\n");
                       Log.e(TAG, "Sub Divide begin...\n");
                       longObservable.subscribeOn(Schedulers.io())
                               .observeOn(AndroidSchedulers.mainThread())
                               .subscribe(new Consumer<Long>() {
                                   @Override
                                   public void accept(@NonNull Long aLong) throws Exception {
                                       mRxOperatorsText.append("Next:" + aLong + "\n");
                                       Log.e(TAG, "Next:" + aLong + "\n");
                                   }
                               });
                   }
               });

repeat 发送多少次

public static void repeat(){
        Observable.range(5,10).repeat(20)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return String.valueOf(integer).intern();
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

    }
//从5开始发送到14 重复20次。

startWith 事件前加入指定事件

public static void startWith(){

        Observable.range(5,5).startWith(0)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.print(integer+"");
                    }
                });

    }
056789

delay 延迟几秒发射。#timeout,超时后发送自己的。

do系列

doOnNext()
、doAfterNext()
、doFinally()completed或者error时触发
,doOnSubscribe()在订阅时触发,一般用来初始化。

错误操作符

catch

onErrorReturn 在错误触发时,返回一个特殊的项替代错误,不会传递给观察者。
onErrorResumeNext 发生错误发送备用observable给观察者。

public static void catch1(){
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onError(null);
                e.onNext("111");
            }
        }).onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                return "发生错误了,而你不知道";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.print(s);
            }
        });
    }
发生错误了,而你不知道
#retry(long time) 重试  指定次数。

#toFlowable 将observable转成Flowable




最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容