Rxjava2 方法讲解

前言

关于RxJava介绍的文章已经很多,但是关于实战的教程却不尽人意,今天就从代码的角度分析一下RxJava

    implementation 'io.reactivex.rxjava2:rxjava:2.1.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

Observable.create(ObservableOnSubscribe<T> )

最常见的操作符,用于生产一个发射对象,在ObservableOnSubscribe的接口方法中有一个ObservableEmitter<T>对象用于发送数据,泛型T代表发送数据的类型
ObservableEmitter继承与Emitter

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

Observable可以用subscribe(Observer<? super T> observer)方法订阅一个或多个观察者,subscribe返回一个Disposable 对象

 Observable.create(new ObservableOnSubscribe<Integer>(){
            @Override
            public void subscribe(@NonNull 
   ObservableEmitter<Integer> e) throws Exception {  
  e.onNext(1);
  e.onNext(2);
  e.onNext(3);
  e.onComplete();//onComplete继续发送就接收不到了
 e.onNext(4);
  }
}).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                 i++;
                if (i == 2) {
             //isposable可以做到切断的操作,让Observer观察者不再接收上游事件
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
         
            }

            @Override
            public void onComplete() {
     
            }
        });

Observer有4个方法,onSubscribe,被订阅的时候调用,回传一个Disposable对象,可用于终止接受信号,onNext是在ObservableEmitter调用onNext时候调用,同理onComplete也是

注意:subscribe可以直接订阅Consumer,只有在上游onNext方法是会调用

subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integers) throws Exception {
              
            }
        });

Observable.just(T t1,Tt2 ....)

依次发送数据

Observable.just("1", "2", "3")
             .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                     }
                });

Observable.zip( ObservableSource<? extends T1> ob1,ObservableSource<? extends T2> ob2, BiFunction<? super T1, ? super T2, ? extends R> bif)

注:Observable<T> implements ObservableSource<T>

合并事件专用,分别从两个上游事件中各取出一个组合,一个事件只能被使用一次,顺序严格按照事件发送的顺序,最终下游事件收到的是和上游事件最少的数目相同(必须两两配对,多余的舍弃)

在zip的第三个参数中,BiFunction对象有三个泛型,第一个泛型是ob1的泛型参数,第二个表示ob2的泛型参数,第三个表示返回值的类型


Observable ob1=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    e.onNext("B");
                    e.onNext("C");
                }
            }
        });

Observable ob2=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                  //4,5会被舍弃
                    e.onNext(4);
                    e.onNext(5);
                }
            }
        });
Observable.zip(ob1, ob2, new BiFunction<String, Integer, String>(){
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {

            }
        });

Observable<R> map(Function<? super T, ? extends R> mapper)

对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化,对参数进行加工,相当于添加了一个中间层

Function有两个泛型,第一个是要加工参数类型,第二个是apply返回类型

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws 
  Exception{
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {

            }
        });

Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

FlatMap将一个发送事件的上游Observable变换成多个发送事件的Observables,然后将它们发射的时间合并后放进一个单独的Observable里
flatMap和map相似,参数也是一个Function对象,不同的是Function对象的第二个泛型参数不一样,flatmap的第二个泛型参数是一个ObservableSource<T> ,即在apply方法中,返回一Observable<T>对象
可用于实现多个网络请求依次依赖数据打平

  Observable.create(new ObservableOnSubscribe<ArrayList<String>>(){
            @Override
            public void subscribe(@NonNull ObservableEmitter<ArrayList<String>> e) throws Exception {
                List<String> list=new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " +i);
                }
            }
        }).flatMap(new Function<ArrayList<String>, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull ArrayList<String> list) throws Exception{
                int delayTime = (int) (1 + Math.random() * 10);
                return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception{

                    }
                });

注意:flatMap并不保证事件的顺序,与此类似的操作符还有concatMapconcatMap作用和flatMap几乎一模一样,唯一的区别是它能保证事件的顺序

Observable<T> doOnNext(Consumer<? super T> onNext)

Observable.just(1, 2, 3, 4)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("do someThing");
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
              
            }
        });

doOnNext和map有些类似,都会改变下游数据,但是map可以改变下游数据的类型,而doOnNext不能

Observable<T> filter(Predicate<? super T> predicate)

用于过滤数据

Observable.just(1, 20, 65, -5, 7, 19)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer integer) throws Exception {
                        return integer >= 10;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
               
            }
        });

fliter传入一个Predicate对象,泛型参数类型为test方法的类型,test方法返回值为固定的布尔类型,返回true才会向下游传递

Observable<T> skip(long count)

接受一个long型参数,代表跳过多少个数目的事件再开始接收

 Observable.just(1,2,3,4,5)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                      
                    }
                });

Observable<T> take(long count)

指定订阅者最多收到多少数据

Observable.fromArray(1,2,3,4,5)
                .take(2)
                .subscribe(new Consumer<Integer>(){
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                       
                    }
                });

Observable<Long> timer(long delay, TimeUnit unit)

timer 操作符可以延迟执行一段逻辑,默认执行在一个新线程上。

 Observable.timer(2, TimeUnit.SECONDS).create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {

            }
        }).observeOn(AndroidSchedulers.mainThread()) // 由于timer默认在新线程,所以我们应该切回主线程.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {

            }
        });

Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

间隔执行操作,默认在新线程

Observable.interval(3, 2, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread()) // 由于interval默认在新线程,所以我们应该切回主线程
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                    
                    }
                });

Single和SingleObserver

SingleObserver与普通的Observer相同,但是只接受上游的一个参数,并且没有onComplete方法

 Single.just(new Random().nextInt())
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }
                    @Override
                    public void onSuccess(@NonNull Integer integer) {
                     
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }
                });

Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

连接操作符,可接受Observable的可变参数,或者Observable的集合,注意两个 Observable 的泛型应当保持一致

Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                                    
  }});

可用于多个连续的操作,例如三级缓存,先从内存缓存区,没有再从磁盘缓存取,最后请求网络。

Observable<T> distinct()

去重操作符,其实就是简单的去重

Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                   
                    }
                });

Observable<List<T>> buffer(int count, int skip)

将 observable 中的数据按 skip(步长)分成最长不超过 count 的 buffer,然后生成一个 observable

Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                  
                        Log.e(TAG, "buffer value : " );
                        for (Integer i : integers) {
                          
                            Log.e(TAG, i + "");
                        }
                        Log.e(TAG, "\n");
                    }
                });

这里有没有很熟悉,我们之前用flatMap可以把一个数组打平,而这里则是将数据生成一个数组,最后都是生成一个Observer,所以buffer可以看作是flatMap的反操作

Observable<T> debounce(long timeout, TimeUnit unit)

过滤掉发射速率过快的数据项

Observable.create(new ObservableOnSubscribe<Integer>(){
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                   
                    }
                });

Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

次订阅都会创建一个新的Observable,并且如果该Observable没有被订阅,就不会生成新的Observable

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });

Single<T> last(T defaultItem)

取出最后一个值,参数是没有值的时候的默认值

Observable.just(1, 2, 3)
                .last(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                   
                    }
                });

需要注意last返回Single对象,所以在subscribe方法如果订阅一个Observer只能是SingleObserver

Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2 ...)

多个Observable合起来,接受可变参数,也支持使用迭代器集合

Observable observable1=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).delay(500,TimeUnit.MILLISECONDS);
        Observable observable2=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(4);
                e.onNext(5);
                e.onNext(6);
            }
        });
  Observable.merge(observable1,observable2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull final Integer integer) throws Exception {
                Log.e(TAG, "accept: merge :" + integer + "\n" );
                    }
                });

我们之前学过一个相似的效果concat操作符,也是合并多个Observable对象,merge和concat有什么区别吗?重点在于是否按照顺序执行,concat会按照参数的顺序执行,即使你设置了delay,它也会先等待后执行第一个Observable,而merge不会,例如上面代码会先打印456,另外delay是另外开了一个新的线程,如果我们要操作UI,需要切换回主线程

Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)

就是一次用一个方法处理一个值,可以有一个seed作为初始值

Observable.just(1, 2, 3)
                .reduce(1,new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer, 
              @NonNull Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
             
                Log.e(TAG, "accept: reduce : " + integer + "\n");
            }
        });

1+1+2+3,所以最后打印是7

Observable<R> scan(final R initialValue, BiFunction<R, ? super T, R> accumulator)

和上面的reduce差不多,区别在于reduce()只输出结果,而scan()会将过程中每一个结果输出

Observable.just(1, 2, 3)
                .scan(1,new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception{
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>(){
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
           
                Log.e(TAG, "accept: scan " + integer + "\n");
            }
        });

Observable<Observable<T>> window(long timespan, TimeUnit unit)

按照时间划分窗口,将数据发送给不同的Observable

  Observable.interval(1, TimeUnit.SECONDS).create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> e) throws Exception {
                        e.onNext(1l);
            }
        }).take(15) // 最多接收15个
                .window(3, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Observable<Long>>(){
                    @Override
                    public void accept(@NonNull Observable<Long> longObservable) throws Exception {

                        longObservable.subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe(new Consumer<Long>() {
                                    @Override
                                    public void accept(@NonNull Long aLong) throws Exception {

                                    }
                                });
                    }
                });

PublishSubject

onNext() 会通知每个观察者,仅此而已

  PublishSubject<Integer> publishSubject = PublishSubject.create();

        publishSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });
        publishSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });
        publishSubject.onNext(1);
        publishSubject.onNext(2);
        publishSubject.onNext(3);

会通知订阅的每一个观察者而不会产生覆盖

AsyncSubject

在 调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存, 在调用 onComplete() 之后只有最后一个 onNext() 会生效

  AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
        asyncSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });
        asyncSubject.onNext(1);
        asyncSubject.onNext(2);
        asyncSubject.onNext(3);
        asyncSubject.onComplete();

只有3会被发送

BehaviorSubject

  BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
        behaviorSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });
        behaviorSubject.onNext(1);
        behaviorSubject.onNext(2);
        behaviorSubject.onNext(3);
        behaviorSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

            }
        });
        behaviorSubject.onNext(4);
        behaviorSubject.onNext(5);
        behaviorSubject.onComplete();

3会被缓存,在订阅新的观察者时候,会被发送到新的观察者,并且behaviorSubject也可以订阅多个观察者,即4,5会发送给两个观察者

Completable

只关心结果,也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果

Completable.timer(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onComplete() {
  
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }
                });

subscribeOn(Schedulers.io())/observeOn(AndroidSchedulers.mainThread())

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容