RxJava 便查手册

介绍

添加依赖

implementation 'io.reactivex.rxjava2:rxjava:xxx'
implementation 'io.reactivex.rxjava2:rxandroid:xxx'

最新版本号获取地址RxJava,RxAndroid

需要了解的基本概念

RxJava由三部分组成: Observable被观察者,Observer观察者,subscribe订阅

操作符

创建

  1. create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
  
//----
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});
//--
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("chan","------> onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {
        Log.d("chan","------> onComplete ");
    }
};
        
observable.subscribe(observer);
  1. just()
    创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
public static <T> Observable<T> just(T item) 
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

//----
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  onComplete ");
    }
});
  1. fromArray()
    和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组
public static <T> Observable<T> fromArray(T... items)

//---
Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  onComplete ");
    }
});
  1. fromCallable()
    这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)

Observable.fromCallable(new Callable < Integer > () {

    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "------>  accept " + integer);
    }
});

  1. fromFuture()
    参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。
public static <T> Observable<T> fromFuture(Future<? extends T> future)

//----
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
    @Override
    public String call() throws Exception {
        Log.d(TAG, "CallableDemo is Running");
        return "返回结果";
    }
});

Observable.fromFuture(futureTask)
    .doOnSubscribe(new Consumer < Disposable > () {//doOnSubscribe() 的作用就是只有订阅时才会发送事件
    @Override
    public void accept(Disposable disposable) throws Exception {
        futureTask.run();
    }
})
.subscribe(new Consumer < String > () {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "------>  accept " + s);
    }
});
  1. fromIterable()
    直接发送一个 List 集合数据给观察者
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

//----
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  onComplete ");
    }
});
  1. defer()
    这个方法的作用就是直到被观察者被订阅后才会创建被观察者。
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

//----

// i 要定义为成员变量
Integer i = 100;
        
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});

i = 200;

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

observable.subscribe(observer);

i = 300; //最后返回这个值

observable.subscribe(observer);

  1. timer()
    当到指定时间后就会发送一个 0L 的值给观察者。
public static Observable<Long> timer(long delay, TimeUnit unit) 
......

//----
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "------>  onNext " + aLong);// 输出0 只执行一次
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. interval()
    每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......

//----
Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------> =onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "------> =onNext " + aLong);// 间隔多长时间  多次执行
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. intervalRange()
    可以指定发送事件的开始值和数量,其他与 interval() 的功能一样
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

//----
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------> =onSubscribe ");
    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "------> =onNext " + aLong); 
        // 结果 2,3,4,5,6 (2开始,5次)
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. range()
    同时发送一定范围的事件序列。
public static Observable<Long> rangeLong(long start, long count)

//----
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------> =onSubscribe ");
    }

    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "------> =onNext " + aLong);// 23456
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. rangeLong()
    作用与 range() 一样,只是数据类型为 Long
public static Observable<Long> rangeLong(long start, long count)
  1. empty() & never() & error()
    • empty() : 直接发送 onComplete() 事件
    • never():不发送任何事件
    • error():发送 onError() 事件
Observable.empty()
.subscribe(new Observer < Object > () {

    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe");
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "------>  =onNext");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete");
    }
});  

//----
empty() -->  ------>  =onSubscribe ------>  =onComplete
never() -->  ------>  =onSubscribe
error() -->   ------>  =onSubscribe ------>  =onError java.lang.NullPointerException

转换

  1. map()
    map可以将被观察者发送的数据类型转变成其他的类型
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

//-----
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return "I'm " + integer;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "------>   onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.e(TAG, "------>   onNext " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. flatMap()
    这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
    flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个例子来说明 flatMap() 的用法。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
......

//----
Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "------>  =action: " + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});
  1. concatMap()
    concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)

//-----
// ^^^^^  flatMap() 
  1. buffer()
    从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
public final Observable<List<T>> buffer(int count, int skip)
......

//-----
//buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。
Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(List < Integer > integers) {
        Log.d(TAG, "------>  缓冲区大小: " + integers.size());
        for (Integer i: integers) {
            Log.d(TAG, "------>  元素: " + i);
        }
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

//----结果
// 从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。
05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ------>  缓冲区大小: 2
------>  元素: 1
------>  元素: 2
------>  缓冲区大小: 2
------>  元素: 2
------>  元素: 3
------>  缓冲区大小: 2
------>  元素: 3
------>  元素: 4
------>  缓冲区大小: 2
------>  元素: 4
------>  元素: 5
------>  缓冲区大小: 1
------>  元素: 5
  1. groupBy()
    将发送的数据进行分组,每个分组都会返回一个被观察者。
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

//-------
Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 3;
    }
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>   onSubscribe ");
    }

    @Override
    public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
        Log.d(TAG, "------>   onNext ");
        integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "------>   GroupedObservable onSubscribe ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "------>   GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "------>   GroupedObservable onError ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "------>   GroupedObservable onComplete ");
            }
        });
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>   onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>   onComplete ");
    }
});

//----结果---
05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ------>   onSubscribe 
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ------>   onNext 
------>   GroupedObservable onSubscribe     ------>   GroupedObservable onNext  groupName: 2 value: 5
------>   GroupedObservable onNext  groupName: 2 value: 2
------>   onNext 
------>   GroupedObservable onSubscribe 
------>   GroupedObservable onNext  groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ------>   onNext 
------>   GroupedObservable onSubscribe 
------>   GroupedObservable onNext  groupName: 1 value: 4
------>   GroupedObservable onNext  groupName: 1 value: 1
------>   GroupedObservable onNext  groupName: 0 value: 6
------>   GroupedObservable onNext  groupName: 2 value: 8
------>   GroupedObservable onNext  groupName: 0 value: 9
------>   GroupedObservable onNext  groupName: 1 value: 7
------>   GroupedObservable onNext  groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ------>   GroupedObservable onComplete 
------>   GroupedObservable onComplete 
------>   GroupedObservable onComplete 
------>   onComplete 
  1. scan()
    将数据以一定的逻辑聚合起来。
public final Observable<T> scan(BiFunction<T, T, T> accumulator)

//-------

Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        Log.d(TAG, "------>   apply ");
        Log.d(TAG, "------>   integer " + integer);
        Log.d(TAG, "------>   integer2 " + integer2);
        return integer + integer2;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "------>   accept " + integer);
    }
});
  1. window()
    发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。
public final Observable<Observable<T>> window(long count)
......

//-------
//有几组   就会走几个 onComplete()

组合操作符

  1. concat()
    可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
  1. concatArray()
    与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
  1. merge()
    这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。
    我的理解是 concat()一个完了走另个, 而merge()是好朋友一起走 数据乱了.
 public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
  1. concatArrayDelayError() & mergeArrayDelayError()
    在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
  1. zip()
    会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
......
  1. combineLatest() & combineLatestDelayError()
    combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
....... 
  1. reduce()
    与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
  1. collect()
    将数据收集到数据结构当中。
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)

//-----
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
    @Override
    public ArrayList < Integer > call() throws Exception {
        return new ArrayList < > ();
    }
},
new BiConsumer < ArrayList < Integer > , Integer > () {
    @Override
    public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
        integers.add(integer);
    }
})
.subscribe(new Consumer < ArrayList < Integer >> () {
    @Override
    public void accept(ArrayList < Integer > integers) throws Exception {
        Log.d(TAG, "------>  accept " + integers);  [1,2,3,4]
    }
});
  1. startWith() & startWithArray()
    在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)

//----

Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "------>  accept " + integer); //分别打出 1,2,3,4,5,6,7
    }
});
  1. count()
    返回被观察者发送事件的数量。
public final Single<Long> count()  

//-----
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
    @Override
    public void accept(Long aLong) throws Exception {
        Log.d(TAG, "------>    aLong " + aLong);  // 3
    }
});

功能操作符

  1. delay()
    延迟一段事件发送事件。
public final Observable<T> delay(long delay, TimeUnit unit)

// onSubscribe 不会延时  Next() 会
  1. doOnEach()
    Observable 每发送一件事件之前都会先回调这个方法。并且可以取出 onNext() 发送的值。不仅仅是onNext();
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
  1. doOnNext()
    Observable 每发送 onNext() 之前都会先回调这个方法。
public final Observable<T> doOnNext(Consumer<? super T> onNext)
  1. doAfterNext()
    Observable 每发送 onNext() 之后都会回调这个方法。
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
  1. doOnComplete()
    Observable 每发送 onComplete() 之前都会回调这个方法。
public final Observable<T> doOnComplete(Action onComplete)
  1. doOnError()
    Observable 每发送 onError() 之前都会回调这个方法。
public final Observable<T> doOnError(Consumer<? super Throwable> onError)
  1. doOnSubscribe()
    Observable 每发送 onSubscribe() 之前都会回调这个方法。
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
  1. doOnDispose()
    当调用 Disposable 的 dispose() 之后回调该方法。
public final Observable<T> doOnDispose(Action onDispose)

/*
* 疑问  --> Disposable 是个啥?
*/

  1. doOnLifecycle()
    在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)

/*
 * 不懂 ~ ~ 
*/
  1. doOnTerminate() & doAfterTerminate()
    doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。
public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)
  1. doFinally()
    在所有事件发送完毕之后回调该方法。
    这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。
public final Observable<T> doFinally(Action onFinally)
  1. onErrorReturn()
    当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)


//----
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorReturn(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) throws Exception {
        Log.d(TAG, "------>  =onErrorReturn " + throwable);
        return 404;
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  =onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete ");
    }
});

//----
05-23 18:35:18.175 19239-19239/? D/chan: ------>  =onSubscribe 
------>  =onNext 1
------>  =onNext 2
------>  =onNext 3
------>  =onErrorReturn java.lang.NullPointerException
------>  =onNext 404
------>  =onComplete 

  1. onErrorResumeNext()
    当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)

//----
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new NullPointerException());
    }
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
        Log.d(TAG, "------>  =onErrorResumeNext " + throwable);
        return Observable.just(4, 5, 6);
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  =onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete ");
    }
});

//-----
05-23 18:43:10.910 26469-26469/? D/chan: ------>  =onSubscribe 
------>  =onNext 1
------>  =onNext 2
------>  =onNext 3
------>  =onErrorResumeNext java.lang.NullPointerException
------>  =onNext 4
------>  =onNext 5
------>  =onNext 6
------>  =onComplete 
  1. onExceptionResumeNext()
    与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。
public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)
  1. retry()
    如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。
public final Observable<T> retry(long times)  

//----
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onError(new Exception("404"));
    }
})
.retry(2)
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "------>  =onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete ");
    }
});

05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ------>  =onSubscribe 
05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ------>  =onNext 1
------>  =onNext 2
------>  =onNext 3
------>  =onNext 1
------>  =onNext 2
------>  =onNext 3
------>  =onNext 1
------>  =onNext 2
------>  =onNext 3
------>  =onError 

  1. retryUntil()
    出现错误事件之后,可以通过此方法判断是否继续发送事件。
public final Observable<T> retryUntil(final BooleanSupplier stop)
  1. retryWhen()
    当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
Observable.create(new ObservableOnSubscribe < String > () {
    @Override
    public void subscribe(ObservableEmitter < String > e) throws Exception {
        e.onNext("chan");
        e.onNext("ze");
        e.onNext("de");
        e.onError(new Exception("404"));
        e.onNext("haha");
    }
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
    @Override
    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
            @Override
            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                    return Observable.just("可以忽略的异常");
                } else {
                    return Observable.error(new Throwable("终止啦"));
                }
            }
        });
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe ");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, "------>  =onNext " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError " + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete ");
    }
});

//---- 
05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ------>  =onSubscribe 
05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ------>  =onNext chan
------>  =onNext ze
------>  =onNext de
05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ------>  =onError java.lang.Throwable: 终止啦


//----
//将 onError(new Exception("404")) 改为 onError(new Exception("303")) 看看打印结果:
------>  =onNext chan
05-24 09:54:08.653 29694-29694/? D/chan: ------>  =onNext ze
------>  =onNext de
------>  =onNext chan
------>  =onNext ze
------>  =onNext de
------>  =onNext chan
------>  =onNext ze
------>  =onNext de
------>  =onNext chan
------>  =onNext ze
------>  =onNext de
------>  =onNext chan
------>  =onNext ze
------>  =onNext de
------>  =onNext chan
......
  1. repeat()
    重复发送被观察者的事件,times 为发送次数。
public final Observable<T> repeat(long times)

// 重复 一遍 onNext();
// onSubscribe()-> ...next...onComplete()
  1. repeatWhen()
    这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

//这里分三种情况,如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。

  1. subscribeOn()
    指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。
public final Observable<T> subscribeOn(Scheduler scheduler)
  1. observeOn()
    指定观察者的线程,每指定一次就会生效一次。
public final Observable<T> observeOn(Scheduler scheduler)

Schedulers.computation( )
用于使用计算任务,如事件循环和回调处理
Schedulers.immediate( )
当前线程
Schedulers.io( )
用于 IO 密集型任务,如果异步阻塞 IO 操作。
Schedulers.newThread( )
创建一个新的线程
AndroidSchedulers.mainThread()
Android 的 UI 线程,用于操作 UI。

过滤操作符

  1. filter()
    通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
public final Observable<T> filter(Predicate<? super T> predicate)


//----
 Observable.just(1, 2, 3)
    .filter(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 2;
        }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "------>  =onSubscribe ");
    }

    @Override
    public void onNext(Integer integer) {
        i += integer;
        Log.d(TAG, "------>  =onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "------>  =onError ");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "------>  =onComplete ");
    }
});


//-----
05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ------>  =onSubscribe 
------>  =onNext 1
------>  =onComplete 
  1. ofType()
    可以过滤不符合该类型事件
public final <U> Observable<U> ofType(final Class<U> clazz)

//---
Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
  1. skip()
    跳过正序某些事件,count 代表跳过事件的数量
public final Observable<T> skip(long count)

//---
Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {

//最后结果只有3
  1. distinct()
    过滤事件序列中的重复事件。
public final Observable<T> distinct() 

//---
Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
  1. distinctUntilChanged()
    过滤掉连续重复的事件
public final Observable<T> distinctUntilChanged()

//----
Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
//--> 12321
  1. take()
    控制观察者接收的事件的数量。
public final Observable<T> take(long count)

//---
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
//-> 123
  1. debounce()
    如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
    throttleWithTimeout() 与此方法的作用一样,这里就不再赘述了。
public final Observable<T> debounce(long timeout, TimeUnit unit)
//-> 是前一事件
  1. firstElement() && lastElement()
    firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。
public final Maybe<T> firstElement()
public final Maybe<T> lastElement()
  1. elementAt() & elementAtOrError()
    elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。 --> elementAtOrError 会抛出异常 !!!!!!
public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)

条件操作符

  1. all()
    判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。
public final Observable<T> ambWith(ObservableSource<? extends T> other)

//---
Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 5;
    }
})
.subscribe(new Consumer < Boolean > () {

//-->   true 
  1. takeWhile()
    可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。
public final Observable<T> takeWhile(Predicate<? super T> predicate)

//----
Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer < 3;
    }
})
.subscribe(new Consumer < Integer > () {

// -> 1,2
  1. skipWhile()
    可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。
public final Observable<T> skipWhile(Predicate<? super T> predicate)

// 上面的例子  -> 3,4
  1. takeUntil()
    可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
    @Override
    public boolean test(Integer integer) throws Exception {
        return integer > 3;
    }
})
.subscribe(new Consumer < Integer > () {

// -> 1234
// 注意是下一次事件 !!!
  1. skipUntil()
    当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。
public final <U> Observable<T> skipUntil(ObservableSource<U> other)

//---
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
  1. sequenceEqual()
    判断两个 Observable 发送的事件是否相同。
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)

//---
Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {

// -> true
  1. contains()
    判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。
public final Single<Boolean> contains(final Object element)

// -----
Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {

// -> true
  1. isEmpty()
    判断事件序列是否为空。
public final Single<Boolean> isEmpty()
  1. amb()
    amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
  1. defaultIfEmpty()
    如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。
public final Observable<T> defaultIfEmpty(T defaultItem)

//---
Observable.create(new ObservableOnSubscribe < Integer > () {

    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        e.onComplete();
    }
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "------> onNext " + integer);
    }
});

//---> onNext 666

Document

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,830评论 0 10
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 902评论 0 2
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,451评论 7 62
  • 书上说 无回应之地,就是绝境 想一个不想你的人 念一个不念你的人 爱一个不爱你的人 担心一个不要你操心的人 牵挂一...
    楚生阅读 227评论 0 1
  • 知道高教时间长也不长,最早听说是自己刚来北京时(这是最近才想起来的)。大概15年九月份,梁老师给自己无意间提了一嘴...
    南瓜bu说话阅读 2,044评论 0 3