介绍
添加依赖
implementation 'io.reactivex.rxjava2:rxjava:xxx'
implementation 'io.reactivex.rxjava2:rxandroid:xxx'
需要了解的基本概念
RxJava由三部分组成: Observable被观察者,Observer观察者,subscribe订阅
操作符
创建
- create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
//----
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello Observer");
e.onComplete();
}
});
//--
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d("chan","------> onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d("chan","------> onComplete ");
}
};
observable.subscribe(observer);
- just()
创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
//----
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> onComplete ");
}
});
- fromArray()
和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组
public static <T> Observable<T> fromArray(T... items)
//---
Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> onComplete ");
}
});
- fromCallable()
这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
Observable.fromCallable(new Callable < Integer > () {
@Override
public Integer call() throws Exception {
return 1;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "------> accept " + integer);
}
});
- fromFuture()
参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//----
FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
@Override
public String call() throws Exception {
Log.d(TAG, "CallableDemo is Running");
return "返回结果";
}
});
Observable.fromFuture(futureTask)
.doOnSubscribe(new Consumer < Disposable > () {//doOnSubscribe() 的作用就是只有订阅时才会发送事件
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();
}
})
.subscribe(new Consumer < String > () {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "------> accept " + s);
}
});
- fromIterable()
直接发送一个 List 集合数据给观察者
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//----
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> onComplete ");
}
});
- defer()
这个方法的作用就是直到被观察者被订阅后才会创建被观察者。
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
//----
// i 要定义为成员变量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 200;
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
i = 300; //最后返回这个值
observable.subscribe(observer);
- timer()
当到指定时间后就会发送一个 0L 的值给观察者。
public static Observable<Long> timer(long delay, TimeUnit unit)
......
//----
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "------> onNext " + aLong);// 输出0 只执行一次
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- interval()
每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......
//----
Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "------> =onNext " + aLong);// 间隔多长时间 多次执行
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- intervalRange()
可以指定发送事件的开始值和数量,其他与 interval() 的功能一样
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
//----
Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "------> =onNext " + aLong);
// 结果 2,3,4,5,6 (2开始,5次)
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- range()
同时发送一定范围的事件序列。
public static Observable<Long> rangeLong(long start, long count)
//----
Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Integer aLong) {
Log.d(TAG, "------> =onNext " + aLong);// 23456
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- rangeLong()
作用与 range() 一样,只是数据类型为 Long
public static Observable<Long> rangeLong(long start, long count)
- empty() & never() & error()
- empty() : 直接发送 onComplete() 事件
- never():不发送任何事件
- error():发送 onError() 事件
Observable.empty()
.subscribe(new Observer < Object > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "------> =onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete");
}
});
//----
empty() --> ------> =onSubscribe ------> =onComplete
never() --> ------> =onSubscribe
error() --> ------> =onSubscribe ------> =onError java.lang.NullPointerException
转换
- map()
map可以将被观察者发送的数据类型转变成其他的类型
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
//-----
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
@Override
public String apply(Integer integer) throws Exception {
return "I'm " + integer;
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "------> onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, "------> onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- flatMap()
这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个例子来说明 flatMap() 的用法。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
......
//----
Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
@Override
public ObservableSource < Plan > apply(Person person) {
return Observable.fromIterable(person.getPlanList());
}
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
@Override
public ObservableSource < String > apply(Plan plan) throws Exception {
return Observable.fromIterable(plan.getActionList());
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "------> =action: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- concatMap()
concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
//-----
// ^^^^^ flatMap()
- buffer()
从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
public final Observable<List<T>> buffer(int count, int skip)
......
//-----
//buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。
Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List < Integer > integers) {
Log.d(TAG, "------> 缓冲区大小: " + integers.size());
for (Integer i: integers) {
Log.d(TAG, "------> 元素: " + i);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
//----结果
// 从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。
05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ------> 缓冲区大小: 2
------> 元素: 1
------> 元素: 2
------> 缓冲区大小: 2
------> 元素: 2
------> 元素: 3
------> 缓冲区大小: 2
------> 元素: 3
------> 元素: 4
------> 缓冲区大小: 2
------> 元素: 4
------> 元素: 5
------> 缓冲区大小: 1
------> 元素: 5
- groupBy()
将发送的数据进行分组,每个分组都会返回一个被观察者。
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
//-------
Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 3;
}
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> onSubscribe ");
}
@Override
public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
Log.d(TAG, "------> onNext ");
integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> GroupedObservable onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> GroupedObservable onNext groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> GroupedObservable onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> GroupedObservable onComplete ");
}
});
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> onComplete ");
}
});
//----结果---
05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ------> onSubscribe
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ------> onNext
------> GroupedObservable onSubscribe ------> GroupedObservable onNext groupName: 2 value: 5
------> GroupedObservable onNext groupName: 2 value: 2
------> onNext
------> GroupedObservable onSubscribe
------> GroupedObservable onNext groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ------> onNext
------> GroupedObservable onSubscribe
------> GroupedObservable onNext groupName: 1 value: 4
------> GroupedObservable onNext groupName: 1 value: 1
------> GroupedObservable onNext groupName: 0 value: 6
------> GroupedObservable onNext groupName: 2 value: 8
------> GroupedObservable onNext groupName: 0 value: 9
------> GroupedObservable onNext groupName: 1 value: 7
------> GroupedObservable onNext groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ------> GroupedObservable onComplete
------> GroupedObservable onComplete
------> GroupedObservable onComplete
------> onComplete
- scan()
将数据以一定的逻辑聚合起来。
public final Observable<T> scan(BiFunction<T, T, T> accumulator)
//-------
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG, "------> apply ");
Log.d(TAG, "------> integer " + integer);
Log.d(TAG, "------> integer2 " + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "------> accept " + integer);
}
});
- window()
发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。
public final Observable<Observable<T>> window(long count)
......
//-------
//有几组 就会走几个 onComplete()
组合操作符
- concat()
可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
- concatArray()
与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
- merge()
这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。
我的理解是 concat()一个完了走另个, 而merge()是好朋友一起走 数据乱了.
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
......
- concatArrayDelayError() & mergeArrayDelayError()
在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
- zip()
会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
......
- combineLatest() & combineLatestDelayError()
combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象
public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
.......
- reduce()
与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
- collect()
将数据收集到数据结构当中。
public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
//-----
Observable.just(1, 2, 3, 4)
.collect(new Callable < ArrayList < Integer >> () {
@Override
public ArrayList < Integer > call() throws Exception {
return new ArrayList < > ();
}
},
new BiConsumer < ArrayList < Integer > , Integer > () {
@Override
public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer < ArrayList < Integer >> () {
@Override
public void accept(ArrayList < Integer > integers) throws Exception {
Log.d(TAG, "------> accept " + integers); [1,2,3,4]
}
});
- startWith() & startWithArray()
在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
public final Observable<T> startWith(T item)
public final Observable<T> startWithArray(T... items)
//----
Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "------> accept " + integer); //分别打出 1,2,3,4,5,6,7
}
});
- count()
返回被观察者发送事件的数量。
public final Single<Long> count()
//-----
Observable.just(1, 2, 3)
.count()
.subscribe(new Consumer < Long > () {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "------> aLong " + aLong); // 3
}
});
功能操作符
- delay()
延迟一段事件发送事件。
public final Observable<T> delay(long delay, TimeUnit unit)
// onSubscribe 不会延时 Next() 会
- doOnEach()
Observable 每发送一件事件之前都会先回调这个方法。并且可以取出 onNext() 发送的值。不仅仅是onNext();
public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
- doOnNext()
Observable 每发送 onNext() 之前都会先回调这个方法。
public final Observable<T> doOnNext(Consumer<? super T> onNext)
- doAfterNext()
Observable 每发送 onNext() 之后都会回调这个方法。
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
- doOnComplete()
Observable 每发送 onComplete() 之前都会回调这个方法。
public final Observable<T> doOnComplete(Action onComplete)
- doOnError()
Observable 每发送 onError() 之前都会回调这个方法。
public final Observable<T> doOnError(Consumer<? super Throwable> onError)
- doOnSubscribe()
Observable 每发送 onSubscribe() 之前都会回调这个方法。
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
- doOnDispose()
当调用 Disposable 的 dispose() 之后回调该方法。
public final Observable<T> doOnDispose(Action onDispose)
/*
* 疑问 --> Disposable 是个啥?
*/
- doOnLifecycle()
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
/*
* 不懂 ~ ~
*/
- doOnTerminate() & doAfterTerminate()
doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。
public final Observable<T> doOnTerminate(final Action onTerminate)
public final Observable<T> doAfterTerminate(Action onFinally)
- doFinally()
在所有事件发送完毕之后回调该方法。
这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。
public final Observable<T> doFinally(Action onFinally)
- onErrorReturn()
当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
//----
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.d(TAG, "------> =onErrorReturn " + throwable);
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> =onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete ");
}
});
//----
05-23 18:35:18.175 19239-19239/? D/chan: ------> =onSubscribe
------> =onNext 1
------> =onNext 2
------> =onNext 3
------> =onErrorReturn java.lang.NullPointerException
------> =onNext 404
------> =onComplete
- onErrorResumeNext()
当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
//----
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.d(TAG, "------> =onErrorResumeNext " + throwable);
return Observable.just(4, 5, 6);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> =onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete ");
}
});
//-----
05-23 18:43:10.910 26469-26469/? D/chan: ------> =onSubscribe
------> =onNext 1
------> =onNext 2
------> =onNext 3
------> =onErrorResumeNext java.lang.NullPointerException
------> =onNext 4
------> =onNext 5
------> =onNext 6
------> =onComplete
- onExceptionResumeNext()
与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。
public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)
- retry()
如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。
public final Observable<T> retry(long times)
//----
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Exception("404"));
}
})
.retry(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "------> =onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete ");
}
});
05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ------> =onSubscribe
05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ------> =onNext 1
------> =onNext 2
------> =onNext 3
------> =onNext 1
------> =onNext 2
------> =onNext 3
------> =onNext 1
------> =onNext 2
------> =onNext 3
------> =onError
- retryUntil()
出现错误事件之后,可以通过此方法判断是否继续发送事件。
public final Observable<T> retryUntil(final BooleanSupplier stop)
- retryWhen()
当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
Observable.create(new ObservableOnSubscribe < String > () {
@Override
public void subscribe(ObservableEmitter < String > e) throws Exception {
e.onNext("chan");
e.onNext("ze");
e.onNext("de");
e.onError(new Exception("404"));
e.onNext("haha");
}
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Throwable throwable) throws Exception {
if(!throwable.toString().equals("java.lang.Exception: 404")) {
return Observable.just("可以忽略的异常");
} else {
return Observable.error(new Throwable("终止啦"));
}
}
});
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "------> =onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete ");
}
});
//----
05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ------> =onSubscribe
05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ------> =onNext chan
------> =onNext ze
------> =onNext de
05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ------> =onError java.lang.Throwable: 终止啦
//----
//将 onError(new Exception("404")) 改为 onError(new Exception("303")) 看看打印结果:
------> =onNext chan
05-24 09:54:08.653 29694-29694/? D/chan: ------> =onNext ze
------> =onNext de
------> =onNext chan
------> =onNext ze
------> =onNext de
------> =onNext chan
------> =onNext ze
------> =onNext de
------> =onNext chan
------> =onNext ze
------> =onNext de
------> =onNext chan
------> =onNext ze
------> =onNext de
------> =onNext chan
......
- repeat()
重复发送被观察者的事件,times 为发送次数。
public final Observable<T> repeat(long times)
// 重复 一遍 onNext();
// onSubscribe()-> ...next...onComplete()
- repeatWhen()
这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。
public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
//这里分三种情况,如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。
- subscribeOn()
指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。
public final Observable<T> subscribeOn(Scheduler scheduler)
- observeOn()
指定观察者的线程,每指定一次就会生效一次。
public final Observable<T> observeOn(Scheduler scheduler)
Schedulers.computation( )
用于使用计算任务,如事件循环和回调处理
Schedulers.immediate( )
当前线程
Schedulers.io( )
用于 IO 密集型任务,如果异步阻塞 IO 操作。
Schedulers.newThread( )
创建一个新的线程
AndroidSchedulers.mainThread()
Android 的 UI 线程,用于操作 UI。
过滤操作符
- filter()
通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
public final Observable<T> filter(Predicate<? super T> predicate)
//----
Observable.just(1, 2, 3)
.filter(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 2;
}
})
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "------> =onSubscribe ");
}
@Override
public void onNext(Integer integer) {
i += integer;
Log.d(TAG, "------> =onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "------> =onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "------> =onComplete ");
}
});
//-----
05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ------> =onSubscribe
------> =onNext 1
------> =onComplete
- ofType()
可以过滤不符合该类型事件
public final <U> Observable<U> ofType(final Class<U> clazz)
//---
Observable.just(1, 2, 3, "chan", "zhide")
.ofType(Integer.class)
.subscribe(new Observer < Integer > () {
- skip()
跳过正序某些事件,count 代表跳过事件的数量
public final Observable<T> skip(long count)
//---
Observable.just(1, 2, 3)
.skip(2)
.subscribe(new Observer < Integer > () {
//最后结果只有3
- distinct()
过滤事件序列中的重复事件。
public final Observable<T> distinct()
//---
Observable.just(1, 2, 3, 3, 2, 1)
.distinct()
.subscribe(new Observer < Integer > () {
- distinctUntilChanged()
过滤掉连续重复的事件
public final Observable<T> distinctUntilChanged()
//----
Observable.just(1, 2, 3, 3, 2, 1)
.distinctUntilChanged()
.subscribe(new Observer < Integer > () {
//--> 12321
- take()
控制观察者接收的事件的数量。
public final Observable<T> take(long count)
//---
Observable.just(1, 2, 3, 4, 5)
.take(3)
.subscribe(new Observer < Integer > () {
//-> 123
- debounce()
如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
throttleWithTimeout() 与此方法的作用一样,这里就不再赘述了。
public final Observable<T> debounce(long timeout, TimeUnit unit)
//-> 是前一事件
- firstElement() && lastElement()
firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。
public final Maybe<T> firstElement()
public final Maybe<T> lastElement()
- elementAt() & elementAtOrError()
elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。 --> elementAtOrError 会抛出异常 !!!!!!
public final Maybe<T> elementAt(long index)
public final Single<T> elementAtOrError(long index)
条件操作符
- all()
判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。
public final Observable<T> ambWith(ObservableSource<? extends T> other)
//---
Observable.just(1, 2, 3, 4)
.all(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
})
.subscribe(new Consumer < Boolean > () {
//--> true
- takeWhile()
可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。
public final Observable<T> takeWhile(Predicate<? super T> predicate)
//----
Observable.just(1, 2, 3, 4)
.takeWhile(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer < Integer > () {
// -> 1,2
- skipWhile()
可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。
public final Observable<T> skipWhile(Predicate<? super T> predicate)
// 上面的例子 -> 3,4
- takeUntil()
可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate < Integer > () {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 3;
}
})
.subscribe(new Consumer < Integer > () {
// -> 1234
// 注意是下一次事件 !!!
- skipUntil()
当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。
public final <U> Observable<T> skipUntil(ObservableSource<U> other)
//---
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
.subscribe(new Observer < Long > () {
- sequenceEqual()
判断两个 Observable 发送的事件是否相同。
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
//---
Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1, 2, 3))
.subscribe(new Consumer < Boolean > () {
// -> true
- contains()
判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。
public final Single<Boolean> contains(final Object element)
// -----
Observable.just(1, 2, 3)
.contains(3)
.subscribe(new Consumer < Boolean > () {
// -> true
- isEmpty()
判断事件序列是否为空。
public final Single<Boolean> isEmpty()
- amb()
amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
- defaultIfEmpty()
如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。
public final Observable<T> defaultIfEmpty(T defaultItem)
//---
Observable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onComplete();
}
})
.defaultIfEmpty(666)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "------> onNext " + integer);
}
});
//---> onNext 666