RxJava操作符总结

RxJava操作符

<h3 id="创建操作">创建操作</h3>

<h4 id="Range">Range</h4>

Range操作符根据初始值n和数目m发射一系列大于等于n的m个值

        Observable.range(5, 5).subscribe(new Action1<Integer>() {
           @Override
           public void call(Integer integer) {
               Log.d(TAG, "integer=" + integer); //5,6,7,8,9
           }
       });

<h4>Defer</h4>

Defer操作符只有当有Subscriber来订阅的时候才会创建一个新的Observable对象,也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的。

<h4> Repeat</h4>

Repeat会将一个Observable对象重复发射,我们可以指定其发射的次数。

      Observable.just(1, 2, 3).repeat(5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer); // 1,2,3,1,2,3...重复5次
            }
        });

<h4>Timer</h4>

Timer会在指定时间后发射一个数字0,该操作符运行在Computation Scheduler。

 Observable.timer(3, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d(TAG, "aLong=" + aLong); // 延时3s 
                    }
                });

<h4>Interval</h4>

创建一个按固定时间间隔发射整数序列的Observable.

interval默认在computation调度器上执行。你也可以传递一个可选的Scheduler参数来指定调度器。

     // 间隔时间1秒
        Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                       Log.d(TAG, "aLong=" + aLong); //
                    }
                });

<h3 id="变换操作">变换操作</h3>

<h4 id="Buffer">Buffer</h4>

Buffer操作符定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。

       RxView.clickEvents(mButton)
                .buffer(2, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<List<ViewClickEvent>>() {
                    @Override
                    public void onCompleted() {}
                    @Override
                    public void onError(Throwable e) {}
                    @Override
                    public void onNext(List<ViewClickEvent> viewClickEvents) {
                        if (viewClickEvents.size() > 0) {
                            Toast.makeText(MainActivity.this, "2秒内点击了" + viewClickEvents.size() + "次", Toast.LENGTH_SHORT).show();
                        } else {

                        }
                    }
                });

如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

<h4>GroupBy</h4>

GroupBy操作符将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小的Observable分别发射其所包含的的数据。

               Observable.just(1, 2, 3, 4, 5, 6)
                .groupBy(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(new Action1<GroupedObservable<Boolean, Integer>>() {
                    @Override
                    public void call(final GroupedObservable<Boolean, Integer> observable) {
                        //toList方法转换为Observable<List<T>>
                        observable.toList().subscribe(new Action1<List<Integer>>() {
                            @Override
                            public void call(List<Integer> integers) {
                                Log.d(TAG, "key=" + observable.getKey() + ",values=" + integers);
                                //key=false,values=[1, 3, 5]
                                //key=true,values=[2, 4, 6]
                            }
                        });
                    }
                });

<h4>Scan</h4>

Scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用。

        Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) {
                return x+y;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer);// 1,3,6,10,15,21,28,36,45,55
            }
        });

<h3>过滤操作</h3>

<h4>Debounce</h4>

<h4>ThrottleWithTimeout</h4>

<h4>Distinct/DistinctUntilChanged</h4>

Distinct操作符用来除去重复数据。

  Observable.from(new Integer[]{1,2,2,3,3,3,2,2,1}).distinct().subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer); //1,2,3

            }
        });

DistinctUntilChanged操作符用来过滤掉连续的重复数据。

     Observable.from(new Integer[]{1,2,2,3,3,3,2,2,1}).distinctUntilChanged().subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer); //1,2,3,2,1

            }
        });

<h4>ElementAt</h4>

ElementAt只会返回指定位置的数据。

   Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9})
                .elementAt(4)
                .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer); //5

            }
        });

<h4>Filter</h4>

Filter返回满足过滤条件的数据。

        Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer < 5;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1,2,3,4

                    }
                });

<h4>First</h4>

First操作符返回第一条数据或者返回满足条件的第一条数据。

 Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .first()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1 返回第一条数据 

                    }
                });
                
Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer > 3;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //4 返回满足条件的第一条数据

                    }
                });

<h4>Last</h4>

Last操作符返回最后一条数据或者满足条件的最后一条数据。

<h4>Skip</h4>

Skip操作符将源Observable发射的数据过滤掉前n项。

    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .skip(6)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //7,8,9
                    }
                });

<h4>Take</h4>

Take操作符只取前n项。

        Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
                .take(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer=" + integer); //1,2
                    }
                });

<h4>Sample</h4>

Sample操作符会定时地发射源Observable最近发射的数据,其他的都会被过滤掉。



<h4>ThrottleLast</h4>

<h3 id="结合操作">结合操作</h3>

<h4 id="CombineLatest">CombineLatest</h4>

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

//          (200)(400)(600) (800) (1000)
        // ---0--- 5---10----15----20
        //    (300)   (600)  (900) (1200)(1500)
        //------0------5------10---  15----20
        Observable<Long> observable1=Observable.interval(200,TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong*5;
                    }
                }).take(5);
        Observable<Long> observable2=Observable.interval(300,TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong*5;
                    }
                }).take(5);
        Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong+aLong2;
            }
        }).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d(TAG, "aLong=" + aLong);
                //0(300) ,5+0(400) ,10+0(600),10+5(600),15+5(800),15+10(900)
                //20+10(1000),20+15(1200),20+20(1500)
            }
        });

在实际的开发中我们可以利用该操作符结合RxBinding来实现表单提交的校验。


        final Observable<TextViewTextChangeEvent> usernameChangeObservable = RxTextView.textChangeEvents(mUsernameEditText);
        final Observable<TextViewTextChangeEvent> passwordChangeObservable = RxTextView.textChangeEvents(mPasswordEditText);
        submitButton.setEnabled(false);
        Observable.combineLatest(usernameChangeObservable, passwordChangeObservable,
                new Func2<TextViewTextChangeEvent, TextViewTextChangeEvent, Boolean>() {
                    @Override
                    public Boolean call(TextViewTextChangeEvent event1, TextViewTextChangeEvent event2) {
                        boolean emailCheck = event1.text().length() >= 3;
                        boolean passwordCheck = event2.text().length() >= 3;
                        return emailCheck && passwordCheck;
                    }
                })
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        submitButton.setEnabled(aBoolean);
                    }
                });

<h4 id="Join">Join</h4>

<h4 id=”Merge“>Merge</h4>

Merge操作符将多个Observable发射的数据整合起来发射,就如同是一个Observable发射的数据一样。
当某一个Observable发出onError的时候,merge的过程会被停止并将错误分发给Subscriber,如果不想让错误终止merge的过程,可以使用MeregeDelayError操作符,会将错误在merge结束后再分发。

       Observable<Long> observable1 = Observable.interval(200, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);
        Observable<Long> observable2 = Observable.interval(300, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);
        Observable.merge(observable1, observable2).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d(TAG, "aLong=" + aLong); //0,0,5,10,5,15,10,20,15,20
            }
        });

<h4 id="StartWith">StartWith</h4>

StartWith操作符会在源Observable发射的数据前面插上一些数据。


        Observable.just(1, 2, 3, 4).startWith(-1, 0)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG,"integer="+integer); // -1,0,1,2,3,4
                    }
                });
        Observable.just(1,2,3,4).startWith(Observable.just(-1,0))
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG,"integer="+integer); // -1,0,1,2,3,4
                    }
                });

<h4 id="Zip">Zip</h4>

Zip操作符将多个Observable发射的数据按顺序组合起来,每个数据只能组合一次,而且都是有序的。最终组合的数据的数量由发射数据最少的Observable来决定。

        //  (200)(400)(600) (800) (1000)
        // ---0--- 5---10----15----20
        //    (300)   (600)  (900) (1200)(1500)
        //------0------5------10---  15----20
        Observable<Long> observable1=Observable.interval(200,TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong*5;
                    }
                }).take(5);
        Observable<Long> observable2=Observable.interval(300,TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong*5;
                    }
                }).take(5);
        Observable.zip(observable1, observable2, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong+aLong2;
            }
        }).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d(TAG, "aLong=" + aLong); //0,10,20,30,40
            }
        });

<h4>Delay操作符</h4>

Delay操作符让发射数据的时机延后一段时间,这样所有的数据都会依次延后一段时间发射。在Rxjava中将其实现为Delay和DelaySubscription。不同之处在于Delay是延时数据的发射,而DelaySubscription是延时注册Subscriber。

<h4>Do操作符</h4>

Do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。在Rxjava实现了很多的doXxx操作符。
doOnEach可以给Observable加上这样的样一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
DoOnNext则只有onNext的时候才会被触发。

doOnNext则只有onNext的时候才会被触发。
doOnError会在OnError发生的时候触发回调,并将Throwable对象作为参数传进回调函数里
doOnComplete会在OnCompleted发生的时候触发回调。
doOnTerminate会在Observable结束前触发回调,无论是正常还是异常终止;finallyDo会在Observable结束后触发回调,无论是正常还是异常终止。

doOnSubscribe和doOnUnSubscribe则会在Subscriber进行订阅和反订阅的时候触发回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。

  Observable.just(1, 2, 3, 4)
                .doOnEach(new Action1<Notification<? super Integer>>() {
                    @Override
                    public void call(Notification<? super Integer> notification) {
                        Log.d(TAG, "doOnEach" + notification.getKind().name()); // onNext,onNext,onNext,onNext,onCompleted
                    }
                })
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "doOnSubscribe");
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.d(TAG, "doOnUnsubscribe");
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "integer = " + integer);
                    }
                });

<h4>Meterialize操作符</h4>

Meterialize操作符将OnNext/OnError/OnComplete都转化为一个Notification对象并按照原来的顺序发射出来,而DeMeterialize则是执行相反的过程。

     Observable.just(1,2,3)
                .materialize()
                .subscribe(new Action1<Notification<Integer>>() {
                    @Override
                    public void call(Notification<Integer> integerNotification) {
                        Log.d(TAG,"kind="+integerNotification.getKind().name()+"value="+integerNotification.getValue());
                    }
                });

<h4>SubscribOn/ObserverOn</h4>
SubscribOn用来指定Observable在哪个线程上运行。
ObserverOn用来指定观察者所运行的线程。
<h4>TimeInterval/TimeStamp</h4>

TimeInterval会拦截发射出来的数据,然后发射两个发射数据的间隔时间。对于第一个发射的数据,其时间间隔为订阅后到首次发射的间隔。

 Observable.interval(1,TimeUnit.SECONDS,AndroidSchedulers.mainThread()).
        timeInterval().subscribe(new Action1<TimeInterval<Long>>() {
            @Override
            public void call(TimeInterval<Long> longTimeInterval) {
                Log.d(TAG,"value = "+longTimeInterval.getIntervalInMilliseconds());//
            }
        });

TimeStamp会将每个数据项给重新包装一下,加上了一个时间戳来标明每次发射的时间。

<h4>Timeout</h4>
Timeout操作符给Observable加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。

<h4>All操作符</h4>

All操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件,如果全部都满足则返回true,否则就返回false。

      Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).all(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer<=10;
            }
        }).subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d(TAG,"result is "+ aBoolean); //result is true
            }
        });

<h4>Amb操作符</h4>

Amb操作符可以将至多9个Observable结合起来,让他们竞争。哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。

      Observable<Integer> delay3 = Observable.just(1, 2, 3).delay(3000, TimeUnit.MILLISECONDS);
        Observable<Integer> delay2 = Observable.just(4, 5, 6).delay(2000, TimeUnit.MILLISECONDS);
        Observable<Integer> delay1 = Observable.just(7, 8, 9).delay(1000, TimeUnit.MILLISECONDS);
        Observable.amb(delay1, delay2, delay3).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer=" + integer); // 7,8,9
            }
        });

<h4>Contains操作符</h4>

Contains操作符用来判断源Observable所发射的数据是否包含某一个数据,如果包含会返回true,如果源Observable已经结束了却还没有发射这个数据则返回false。


  Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
                .contains(11)
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {
                        Log.d(TAG, "result is " + aBoolean);//result is false
                    }
                });
                

<h4>IsEmpty操作符</h4>

IsEmpty操作符用来判断源Observable是否发射过数据,如果发射过就会返回false,如果源Observable已经结束了却还没有发射这个数据则返回true。

    Observable.create(new Observable.OnSubscribe<Object>() {
           @Override
           public void call(Subscriber<? super Object> subscriber) {
               subscriber.onCompleted();
           }
       }).isEmpty().subscribe(new Action1<Boolean>() {
           @Override
           public void call(Boolean aBoolean) {
               Log.d(TAG, "result is " + aBoolean);//result is true
           }
       });

<h4>DefaultIfEmpty</h4>

DefaultIfEmpty操作符会判断源Observable是否发射数据,如果源Observable发射了数据则正常发射这些数据,如果没有则发射一个默认的数据。

      Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(100).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer= " + integer); // 100
            }
        });

<h4>SequenceEqual操作符</h4>

SequenceEqual操作符用来判断两个Observable发射的数据序列是否相同(发射的数据相同,数据的序列相同,结束的状态相同),如果相同返回true,否则返回false。

    Observable.sequenceEqual(Observable.just(1,2,3),Observable.just(1,2,3)).subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d(TAG,"result is "+ aBoolean);//result is true
            }
        });
        Observable.sequenceEqual(Observable.just(1,2),Observable.just(1,2,3)).subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d(TAG,"result is "+ aBoolean);//result is false
            }
        });

<h4>SkipUntil操作符</h4>

SkipUnitl根据一个标志Observable来跳过一些数据,当这个标志Observable没有发射数据的时候,所有源Observable发射的数据都会被跳过;当标志Observable发射了一个数据,则开始正常地发射数据。

    Observable.interval(1, TimeUnit.SECONDS)
                .skipUntil(Observable.timer(3, TimeUnit.SECONDS)) //延迟3s
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d(TAG, "aLong = " + aLong); //2,3,4...
                    }
                });

<h4>SkipWhile操作符</h4>

SkipWhile根据一个函数来判断是否跳过数据,当函数返回值为true的时候则一直跳过源Observable发射的数据;当函数返回false的时候则开始正常发射数据。

     Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 5, 4,3,2,1})
                .skipWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) { //1,2,3,4,5
                        Log.d(TAG,"integer -> "+integer ); //如果首次为true后面的将不进行判断
                        return integer<5; //
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer = " + integer); //5, 6, 5, 4,3,2,1
            }
        });

<h4>TakeUntil操作符</h4>

TakeUntil使用一个标志Observable是否发射数据来判断,当标志Observable没有发射数据时,正常发射数据,而一旦标志Observable发射过了数据则后面的数据都会被丢弃。


   Observable.interval(1, TimeUnit.SECONDS)
                .takeUntil(Observable.timer(3, TimeUnit.SECONDS)) //延迟3s
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d(TAG, "aLong = " + aLong); //0,1
                    }
                });

<h4>TakeWhile操作符</h4>

TakeWhile则是根据一个函数来判断是否发射数据,当函数返回值为true的时候正常发射数据;当函数返回false的时候丢弃所有后面的数据。

       Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1})
                .takeWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) { //1,2,3,4,5
                        Log.d(TAG, "integer -> " + integer); //如果首次为false后面的将不进行判断
                        return integer < 5; //
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "integer = " + integer); //1,2,3,4,5
            }
        });

<h4>Concat操作符</h4>

Concat操作符将多个Observable结合成一个Observable并发射数据,并且严格按照先后顺序发射数据,前一个Observable的数据没有发射完,是不能发射后面Observable的数据的。

       Observable<Integer> observable1 = Observable.just(1,2,3);
        Observable<Integer> observable2 = Observable.just(4,5,6);
        Observable<Integer> observable3 = Observable.just(7,8,9);
        Observable.concat(observable1,observable2,observable3).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG,"integer="+integer);// 1,2,3,4,5,6,7,8,9
            }
        });

当一个Observable发生错误的时候,发射会终止。

    Observable<Integer> observable1 = Observable.just(1, 2, 3);
        Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new Throwable("error"));
            }

        });
        Observable<Integer> observable3 = Observable.just(7, 8, 9);
        Observable.concat(observable1, observable2, observable3).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "integer=" + integer);// 1,2,3,error
            }
        });

<h4>Count操作符</h4>

Count操作符用来统计源Observable发射了多少个数据,最后将数目给发射出来;如果源Observable发射错误,则会将错误直接报出来;在源Observable没有终止前,count是不会发射统计数据的。

        Observable.just(1, 2, 3).count().subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "integer=" + integer); // integer=3
            }
        });

<h4>Reduce</h4>

Reduce操作符接收Observable发射的数据并利用提供的函数的计算结果作为下次计算的参数,输出最后的结果。首次没有计算结果传入前两个参数。


    Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).reduce(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) {
                return x+y; // 1+2+3+4+5+6+7+8+9+10
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG,"result="+ integer); // result = 55
            }
        });

<h3>参考</h3>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容