rxjava 的操作符可是一大亮点啊,之前就算是有相应式变成的库也没有像 rxjava 这样提供这样可以变换的操作来。rxjava 操作符神神奇的地方在于他可以把一大堆无关的,单独的异步操作编制成一条执行的链子出来,让这些异步操作按照指定的顺序执行,其中我们可以执行大量的无法想想的操作,可以上上一个异步返回的数据,加工变成一个新的数据交给下一额个异步操作;可以把几个异步做关联,一个执行不完,下一个不能执行,一个执行不过去,之后的也不能执行;可以把异步并行执行,知道最后一个异步返回数据,然后结合这几个数据,返回一个集合数据,等等这些操作再 rxjava 之前都是无法想象的,都是不敢想的,这决然都能封装出来,但是 rxjava 还就是做到了,究其原因害的归功于响应式编程的思路啊。
废话不来了,rxjava 的操作符根据目的是分为几种的,我总结分类一下,便于大家记忆:
创建操作符
使用这类操作符可以不费力的创建出 Observable 对象
基本创建
* create-
快速创建,发送数据
-
延迟创建
变换操作符
可以把一个异步获取的数据进行操作,改变成另一个类型的数据发射,或是把一个 Observable 异步操作变换成另一个 Observable 异步操作
组合操作符
组合操作符可以把多个异步操作合并或是链接起来,这是日常我们最常用的场景了
-
多个 Observable 操作
- concat - 多个 Observable 按顺序执行
- concatArray -
- merge - 多个 Observable 并行,同时执行,不保证数据顺序
- mergeArray
- concatDelayError - 错误兼容处理
- mergeDelayError
- zip - 多个 Observable 并行,合并多个 Observable 的数据,然后返回一个总体的数据
- combineLatest
- combineLatestDelayError
- reduce
- collect
-
添加预处理
- startWith - 添加预处理数据
- startWithArray
-
统计
- count - 计数
功能操作符
-
条件操作符:
判断操作符
* all - 只有全部满足的时候才会返回 true
* contains - 是否包含指定参数
* isEmpty - 是否为空刷选操作符
* filter - 只发射符合条件的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
* ofType - 只发射符合类型的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
* elementAt - 选择指定位置的元素,下标准许越界,如果越界,可以指定默认值
* firstElement / lastElement - 获得第一个数据 / 获得最后一个数据
* distinct - 过滤重复,只要出现过得就不会再出现
* distinctUntilChanged - 过滤连续重复数据,注意必须是连续重复的才有有效
* take / takeLast - 从头开始 / 最后开始 获取指定数目的数据,如果数据量小于指定数目,则进入 error
* skip / skipLast - 正序 / 倒序 跳过指定 count,如果不足,则会进入 onError
* throttleFirst / throttleLast - 指定时间间隔内取第一个 / 最后一个 数据
* throttleWithTimeout / debounce - 2次数据间隔超过指定时间才有效,若一直没有合适的数据,默认取最后一个数据
- 其他:
- delay - 延迟
- onErrorReturn - 错误处理,不阻断操作
- retry - 重试,内容很多
- repeat - 重复发射
创建操作符
just
Observable.just("AA","BB")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("AA", "resylt:" + s);
}
});
just 可以接受一个长度最多为 10 的可变参数
注意 just 只创建出 1 个 Observable 对象,复数的数据都是挨个发送出去,而不是创建多个 Observable 对象
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
return fromArray(item1, item2);
}
看源码就是返回一个 Observable 对象
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
这时最终返回一个可以保存列数数据的 Observable 类型对象 ObservableFromArray
fromArray / fromIterable
fromArray / fromIterable 和 just 什么什么区别,区别就是 fromArray / fromIterable 的数据不限数量,fromArray 和 fromIterable 接受的数据集合类不同罢了
但是必须注意,他们都是只创建一个 Observable 对象,然后遍历集合,挨个发射数据
empty / error / never
<-- empty() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty();
// 即观察者接收后会直接调用onCompleted()
<-- error() -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()
<-- never() -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
defer 延迟创建
通过 Observable工厂方法创建被观察者对象Observable,每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
举个例子,一个 int 参数,我们在创建 Observable 之后修改这个 int 的值,看看可能发射的数据是修改之前的还是之后的
index = 10;
Observable<Integer> just = Observable.just(index);
index = 20;
just.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(tag, "index:" + index);
}
});
看结果是修改之前的数值,那我们用 defer 来看看
index = 10;
Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(index);
}
});
index = 20;
defer.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(tag, "index:" + index);
}
});
这次打印的是修改之后的值,哈哈,大家不必担心的,因为基础数据类型是值传递才会这样,有前后不一致的情况,引用类型就没事啊,大家放心的修改,我做过测试修改的引用类型参数可以正常显示的哦
mBook.setName("AAA");
Observable<Book> just = Observable.just(mBook);
mBook.setName("BBB");
just.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Book>() {
@Override
public void accept(Book book) throws Exception {
Log.d(tag, "book-name:" + book.getName());
}
});
说实话这个 defer 有些像 AAC 的 liveData ,只不过没有 liveData 这么友好。
timer / interval
timer 和 interval 都是延迟操作,区别是 timer 只执行一次,interval 会一直执行
需要注意的是第一次发送都是在指定的延迟时间之后进行的。
//
Observable.timer(3, TimeUnit.SECONDS) / interval(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(tag, "time:" + aLong);
}
});
range / rangeLong
计数发射,指定开始值,指定数据发射次数,每一次数据++,range 支持 int 类型,rangeLong 支持 long 类型
Observable.range(0, 5)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(tag, "range:" + integer);
}
});
intervalRange
指定开始数值,指定发射次数,指定首次执行延迟时间
// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
// 该例子发送的事件序列特点:
// 1. 从3开始,一共发送10个事件;
// 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
repeat 重复发射
repeat 顾名思义,就是重复发射,一般结合其他的操作符来使用,参数就是我们想要重复发射的次数
Observable
.range(0, 5)
.repeat(2)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(tag, "range:" + integer);
}
});
变换操作符
map
Observable
.just(11)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "AA";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
});
map 把一个数据类型的 Observable 转成另一个类型的 Observable
flatmap
Observable
.just(11)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just("AA");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
}
});
flatmap 就比 map 要复杂一些了,map 不管 数据有多少,只是一个数据类型的 Observable 转成另一个类型的 Observable
而 flatmap 会把没每一个数据都转成成一个 新的 Observable ,然后最总汇总所有的 Observable 统一生成一个最终的 Observable 。
原理:
- 为事件序列中每个事件都创建一个 Observable 对象;
- 将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
- 将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
- 新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
所以 flatmap 获得的最终的 Observable 里面数据是无序的,这点是需要注意的。flatmap 过程中有大量的 Observable 对象产生销毁,很消耗资源的,我们能用 map 就尽量不要用 flatmap
ConcatMap
ConcatMap 和 flatmap 一样,flatmap 处理过的数据是无序的,ConcatMap 是有序的,就这点区别
Buffer
隔指定补偿,从缓存里循环取出指定数据,感觉这个没什么用啊,至少 android 里体会不到
// 被观察者 需要发送5个数字
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 1) // 设置缓存区大小 & 步长
// 缓存区大小 = 每次从被观察者中获取的事件数量
// 步长 = 每次获取新事件的数量
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> stringList) {
//
Log.d(TAG, " 缓存区里的事件数量 = " + stringList.size());
for (Integer value : stringList) {
Log.d(TAG, " 事件 = " + value);
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应" );
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
组合操作符
concat 、 concatArray
observable 顺序操作符,把多个 observable 按照前后顺序串行执行,执行完一个才能执行下一个,一个 error 了直接中断,不在往下走了,另外 前一个 observable 若有多个数据,只有等前一个 observable 的复数数据全部发射完毕,才能执行下一个 observable 的数据发射任务
concat 只能就收最多4个数据,concatArray 没有数量限制
Observable.concat(Observable.just(11, 22), Observable.just(33, 44))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("CC", "result:" + integer);
}
});
merge / mergeArray
observable 同事执行操作符,把多个 observable 并行执行,所有的 observable 均同时执行,也没有前后之分了
Observable.merge(
Observable.intervalRange(20, 10, 2, 1, TimeUnit.SECONDS)
, Observable.intervalRange(1, 10, 2, 1, TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long integer) throws Exception {
Log.d("CC", "result:" + integer);
}
});
concatDelayError / mergeDelayError
用来处理 concat 、 merge 中 error 的, concat 、 merge 中若有一个数据 error 了,就会结束整个操作,这可能不是我们想要的,那么这个相应的 DelayError 就可以让 error 不阻断操作,在数据都发射后在执行 error 操作
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
zip
数据合并,多 Observable 并发执行,组合多个 Observable 的所有单次数据返回一个综合的数据
Observable
.zip(
Observable.intervalRange(100, 5, 2, 5, TimeUnit.SECONDS),
Observable.intervalRange(200, 5, 2, 5, TimeUnit.SECONDS),
new BiFunction<Long, Long, String>() {
@Override
public String apply(Long aLong, Long aLong2) throws Exception {
return aLong + " / " + aLong2;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("CC", "result: " + s);
}
});
看上面的例子,每一次返回的数据都是 5秒 而不是 10秒,所以证明 zip 组合的 observable 是并发执行的,时间间隔由单次执行最慢的 observable 决定
reduce
把前2个数据交给第三个数据,后面的以此类推
Observable.just(1,2,3,4)
.reduce(new BiFunction<Integer, Integer, Integer>() {
// 在该复写方法中复写聚合的逻辑
@Override
public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
return s1 * s2;
// 本次聚合的逻辑是:全部数据相乘起来
// 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer s) throws Exception {
Log.e(TAG, "最终计算的结果是: "+s);
}
});
collect
把所有发送的数据最终收集到一个集合里统一发送数据
Observable.just(1, 2, 3 ,4, 5, 6)
.collect(
// 1. 创建数据结构(容器),用于收集被观察者发送的数据
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 对发送的数据进行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer)
throws Exception {
// 参数说明:list = 容器,integer = 后者数据
list.add(integer);
// 对发送的数据进行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(@NonNull ArrayList<Integer> s) throws Exception {
Log.e(TAG, "本次发送的数据是: "+s);
}
});
startWith / startWithArray
在正式发送数据前先发送指定数据
Observable
.just(1, 2, 3, 4, 5, 6)
.startWith(0)
.startWithArray(-1, -2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer s) throws Exception {
Log.d("CC", "result: " + s);
}
});
count
统计发送次数,这个不知道实际没有没用
// 注:返回结果 = Long类型
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "发送的事件数量 = "+aLong);
}
});
功能操作符
delay
延迟操作,和 time 不同的是 delay 不能创建出 observable 对象来
Observable
.just(1)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
do
do 操作符包含游很多方法,都是在相应的方法之前执行
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("发生错误了"));
}
})
// 1. 当Observable每发送1次数据事件就会调用1次
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "doOnEach: " + integerNotification.getValue());
}
})
// 2. 执行Next事件前调用
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doOnNext: " + integer);
}
})
// 3. 执行Next事件后调用
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "doAfterNext: " + integer);
}
})
// 4. Observable正常发送事件完毕后调用
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doOnComplete: ");
}
})
// 5. Observable发送错误事件时调用
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "doOnError: " + throwable.getMessage());
}
})
// 6. 观察者订阅时调用
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.e(TAG, "doOnSubscribe: ");
}
})
// 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doAfterTerminate: ");
}
})
// 8. 最后执行
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "doFinally: ");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
onErrorReturn
错误处理,可以先于 onError 接受错误信息,然后返回一个正常信息,不至于中断整个操作
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("发生错误了"));
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(@NonNull Throwable throwable) throws Exception {
// 捕捉错误异常
Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );
return 666;
// 发生错误事件后,发送一个"666"事件,最终正常结束
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
onErrorResumeNext / onExceptionResumeNext
和 onErrorReturn 差不多,遇到错误会抛出一个新的 Observable 来,这2个对象一个接受 Throwable 类型的错误,一个接受 Exception 的错误
retry 、retryWhen
重试,这个比较重要了,当收到 error 决定是重新几次执行几次,什么条件执行
retry 共有5种重载方法
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送
<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
.retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
.retry(3) // 设置重试次数 = 3次
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "retry错误: "+throwable.toString());
//返回false = 不重新重新发送数据 & 调用观察者的onError结束
//返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "异常错误 = "+throwable.toString());
// 获取当前重试次数
Log.e(TAG, "当前重试次数 = "+integer);
//返回false = 不重新重新发送数据 & 调用观察者的onError结束
//返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 拦截错误后,判断是否需要重新发送请求
.retry(3, new Predicate<Throwable>() {
@Override
public boolean test(@NonNull Throwable throwable) throws Exception {
// 捕获异常
Log.e(TAG, "retry错误: "+throwable.toString());
//返回false = 不重新重新发送数据 & 调用观察者的onError()结束
//返回true = 重新发送请求(最多重新发送3次)
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new Exception("发生错误了"));
e.onNext(3);
}
})
// 遇到error事件才会回调
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
// 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
// 返回Observable<?> = 新的被观察者 Observable(任意类型)
// 此处有两种情况:
// 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:
// 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
// 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
// 该异常错误信息可在观察者中的onError()中获得
return Observable.error(new Throwable("retryWhen终止啦"));
// 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
// return Observable.just(1);
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应" + e.toString());
// 获取异常错误信息
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
repeat / repeatWhen
重复发射,这个也是可以决定从复发射几次,什么条件从复发射
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable
// 此处有2种情况:
// 1. 若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
// 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {
// 情况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
return Observable.empty();
// Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
// return Observable.error(new Throwable("不再重新订阅事件"));
// 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
// 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
// return Observable.just(1);
// 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应:" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
takeWhile
只发送满足条件的,但一旦碰到不满足的,后面的均不会发送,complete 结束
//takeWhile
Observable.just(1, 2, 3, 4, 5)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer != 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:1
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:2
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:3
02-11 16:37:18.896 10620-10620/... D/SplashActivity: onComplete
takeUntil
一直发送,直到满足条件时结束,并且,满足条件的那一个也会发送,之后发complete 结束
Observable.just(1, 2, 3, 4, 5, 6)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer == 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:1
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:2
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:3
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:4
02-11 17:03:11.396 12703-12703/... D/SplashActivity: onComplete
all
只有全部满足的时候才会返回 true
Disposable subscribe = Observable.just(1, 2, 3)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>2;
}
})
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 16:33:45.936 10199-10199/... D/SplashActivity: aBoolean:false
contains
是否包含指定参数
Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
.contains(1)
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 17:28:06.486 15538-15538/... D/SplashActivity: aBoolean:true
isEmpty
是否为空
Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
.isEmpty()
.subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 17:31:04.306 15894-15894/... D/SplashActivity: aBoolean:false
filter
只发射符合条件的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer != 4;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:1
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:2
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:3
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:5
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:6
02-11 17:46:46.206 17513-17513/... D/SplashActivity: onComplete
ofType
只发射符合类型的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
Observable.just(1, "one", 2, "tow", 3, "three")
.ofType(Integer.class)
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:1
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:2
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:3
02-11 17:54:15.436 19570-19570/... D/SplashActivity: onComplete
elementAt
选择指定位置的元素,下标准许越界,如果越界,可以指定默认值
有2个参数:
- value1 - 指定数据下标
- value2 - 默认数据
Disposable subscribe = Observable.just(1, 2, 3, 4)
.elementAt(5,10)
.subscribe(integer ->
Log.d(TAG, "integer:" + integer));
02-12 09:37:34.093 13153-13153/... D/SplashActivity: integer:10
firstElement / lastElement
- firstElement - 获得第一个数据
- lastElement - 获得最后一个数据
Disposable subscribe = Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "integer:" + integer);
}
});
02-12 09:28:26.753 10462-10462/... D/SplashActivity: integer:1
distinct
过滤重复,只要出现过得就不会再出现
Observable.just(1, 2,3,1,3)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:1
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:2
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:3
02-11 18:00:08.836 20056-20056/... D/SplashActivity: onComplete
distinctUntilChanged
过滤连续重复数据,注意必须是连续重复的才有有效
Observable.just(1, 2, 3, 1, 3, 3, 4, 4)
.distinctUntilChanged()
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:2
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:4
02-11 18:02:59.656 20334-20334/... D/SplashActivity: onComplete
take / takeLast
- take - 从头开始获取指定数目的数据,如果数据量小于指定数目,则进入 error
- takeLast - 反向从最后开始获取数据
Observable.just(1, 2, 3, 4, 5, 6)
.take(2)
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:1
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:2
02-11 18:35:23.196 21766-21766/... D/SplashActivity: onComplete
skip
- skip - 正序跳过指定 count,如果不足,则会进入 onError
- skipLast - 倒序跳过指定 count,如果不足,则会进入 onError
Observable.just(1, 2, 3, 4).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:3
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:4
02-12 09:19:47.243 9691-9691/... D/SplashActivity: onComplete
throttleFirst / throttleLast
- throttleFirst - 指定时间间隔内取第一个数据
- throttleLast - 指定时间间隔内取最后一个数据
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(1600);
e.onNext(3);
Thread.sleep(2100);
e.onNext(4);
e.onComplete();
}
}).throttleFirst(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
``log
02-12 10:01:16.243 14629-14629/... D/SplashActivity: integer:1
02-12 10:01:18.343 14629-14629/... D/SplashActivity: integer:3
02-12 10:01:20.443 14629-14629/... D/SplashActivity: integer:4
02-12 10:01:20.443 14629-14629/... D/SplashActivity: complete
##### throttleWithTimeout / debounce
throttleWithTimeout 同 debounce
2次数据间隔超过指定时间才有效,若一直没有合适的数据,默认取最后一个数据
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
Thread.sleep(500);
e.onNext(2);
Thread.sleep(500);
e.onNext(3);
Thread.sleep(500);
e.onNext(4);
Thread.sleep(500);
e.onNext(5);
Thread.sleep(500);
e.onNext(6);
e.onComplete();
}
}).throttleWithTimeout(1, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d(TAG, "integer:" + integer);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
02-12 10:15:56.573 16357-16357/... D/SplashActivity: integer:6
02-12 10:15:56.573 16357-16357/... D/SplashActivity: onComplete