1. 使用interval
进行轮询操作,类似于请求用户消息(之前一直使用handler)
/**
* 轮询查询接口-使用操作符interval
* 此处主要展示无限次轮询,若要实现有限次轮询,仅需将interval()改成intervalRange()即可
*/
private void init() {
/**
* 参数说明:
* 参数1==第一次延迟时间,1秒后发送查询请求
* 参数2==间隔时间
* 参数3==实践单位
* 该例子发送的事件特点:延迟2s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
*/
Observable.interval(2, 1, TimeUnit.SECONDS)
/**
* 步骤2:每次发送数字前发送1次网络请求(doOnNext()在执行Next事件前调用)
* 即每隔1秒产生1个数字前,就发送1次网络请求,从而实现轮询需求
*/
.doOnNext(aLong -> {
KLog.d(TTAG, "第" + aLong + "次查询");
retrofitApi.getCall()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
//切断
d.dispose();
}
@Override
public void onNext(Translation translation) {
translation.show();
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "请求失败了:失败原因是:" + e.getMessage());
}
@Override
public void onComplete() {
KLog.d(TTAG, "本次请求结束了");
}
});
}).subscribe(aLong -> {
KLog.d(TTAG, "接收到请求,这是第" + aLong + "次");
});
}
2. 变换操作符,对时间序列进行加工处理,使其转变成不同的事件/序列
常用变换操作符有:
- map()
- flatMap()
- concatMap()
- buffer()
2.1 Map()
作用:对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件,
即, 将被观察者发送的事件转换为任意的类型事件。
例子如下:使用map()操作符使事件参数从整形变换成字符串类型
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}).map(integer ->
"这是发送的第" + integer + "条消息")
.subscribe(s ->
KLog.d(TTAG, "接收事件::" + s));
2.2 flatMap()
作用:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送
无序的将被观察者发送的整个事件序列进行变换
private void flatMap() {
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}).flatMap(integer -> {
final ArrayList<String> strings = new ArrayList<>();
for (int i = 0; i < 3; i++) {
strings.add("我是事件"+integer+"拆分后的子事件"+i);
}
return Observable.fromIterable(strings);
}).subscribe(s -> {
KLog.d(TTAG, s);
});
}
2.3 ConcatMap()
作用:
类似于flatMap(),不过是有序的
新合并生成的事件序列顺序是有序的,即 严格按照旧序列发送事件的顺序
日志如下:
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件1拆分后的子事件2
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件2拆分后的子事件2
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件0
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件1
(Main3Activity.java:61)#lambda$flatMap$2$Main3Activity ] 我是事件3拆分后的子事件2
实例:接口嵌套
/**
* 接口合并,实例,注册登录
*/
private void concatMap() {
retrofitApi.getCall().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(translation -> {
translation.show();
}).observeOn(Schedulers.io())//注册线程结束,作为新的观察者,切换到io此线程(理应为设置subscribeOn(Schedulers.io()))
//作为观察者,下面又有新的观察者,他就作为老的观察者,也就是新的被观察者,所以调控线程用observeOn(Schedulers.io())
.concatMap(translation ->
//添加注册失败是的判断返回空对象
null != translation ? retrofitApi.getCall() : Observable.empty())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(translation -> {
translation.show();
}, throwable -> {
KLog.d(TTAG, throwable.getMessage());
});
}
2.4 buffer()
作用:定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送
/**
* buffer 操作符接受两个参数,buffer(count,skip),
* 作用是将 Observable 中的数据按 skip (步长) 分成最大不超过 count 的 buffer ,然后生成一个 Observable 。
* <p>
* 意思就是取count个,发射之后,重头开始跳过skip个,在选count个发射,一直到最后一个
*/
private void buffer() {
Observable.just(1,2,3,4,5,6,7)
.buffer(3,1)//设置缓存区大小==每次从被观察者中获取的事件数量
//步长:每次获取新事件数量
.subscribe(integers -> {
KLog.d(TTAG, "缓存区数量"+integers.size());
for (Integer integer : integers) {
KLog.d(TTAG, "事件"+integer);
}
});
}
3. 组合操作符
3.1 concat()以及concatArray()
作用:组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个
/**
* 该类型的操作符的作用 = 组合多个被观察者
* 组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
*concat()
* concatArray()
*/
private void concat() {
Observable.concat(Observable.just(1,2)//发射者数量不超过4个
,Observable.just(3,4)
,Observable.just(7,8))
.subscribe(integer -> {
});
Observable.concatArray(Observable.just(1,2)//被观察者数量不受限制
,Observable.just(4,5)
,Observable.just(7,8)
,Observable.just(3,6))
.subscribe(integer -> {
});
}
3.2 merge()以及mergeArray()
作用:组合多个被观察者一起发送数据,合并后 按时间线并行执行
区别为:merge()组合被观察者数量小于等于4,合并后按时间线执行
/**
* 合并发射者,按时间线执行
*/
private void merge() {
Observable.merge(
//延迟发送操作符
//从0开始发送,工发送3个数据,第一次发件延迟时间1秒。间隔时间1s
//
Observable.intervalRange(0,3,1,1,TimeUnit.SECONDS),
Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS)
).subscribe(aLong -> {
});
}
3.3 concatDelayError()以及mergeDelayError()
作用:使用conat以及merge操作符时,如果某个发射者发出error()时间,则会总结整个流程,我们希望onError()事件推迟到其他发射者都发送完时间之后后才会触发,即可使用
concatDelayError()
以及mergeDelayError()
/**
* 使用conat以及merge操作符时,如果某个发射者发出error()时间,则会总结整个流程,
* 我们希望onError()事件推迟到其他发射者都发送完时间之后后才会触发,
* 即可使用` concatDelayError()`以及`mergeDelayError()`
*/
private void concatArrayDelayErrorTest() {
Observable.concatArrayDelayError(Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
emitter.onError(new NullPointerException());
emitter.onComplete();
}),Observable.just(4,5,6))
.subscribe(integer -> {
});
}
4. 合并多个事件
作用:该类型的操作符主要是对多个发射者中的事件进行合并处理
4.1 zip()操作符
作用:合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送
/**
* zip,可以用于接口合并
* 操作符使用例子
* zip 专用于合并事件,该合并不是连接(连接操作符后面会说),
* 而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。
* <p>
* zip 组合事件的过程就是分别从发射器 A 和发射器 B 各取出一个事件来组合,并且一个事件只能被使用一次,
* 组合的顺序是严格按照事件发送的顺序来进行的,所以上面截图中,可以看到,1 永远是和 A 结合的,2 永远是和 B 结合的
*/
public static void useZip() {
Observable.zip(getStringObservable(), getIntegerObservable(),
(s, integer) -> s + integer).subscribe(s -> KLog.d(TTAG, "新的消息字段是" + s));
}
private static Observable<String> getStringObservable() {
return Observable.create((ObservableOnSubscribe<String>) e -> {
if (!e.isDisposed()) {
aaa.append("asd");
e.onNext("A");
aaa.append("asd");
e.onNext("B");
aaa.append("asd");
aaa.append("zxczxc");
e.onNext("C");
}
}).subscribeOn(Schedulers.io());
}
private static Observable<Integer> getIntegerObservable() {
return Observable.create((ObservableOnSubscribe<Integer>) e -> {
if (!e.isDisposed()) {
e.onNext(1);
aaa.append("--" + 1);
e.onNext(2);
aaa.append("--" + 2);
e.onNext(3);
aaa.append("--" + 3);
e.onNext(4);
aaa.append("--" + 4);
e.onNext(5);
aaa.append("--" + 5);
}
}).subscribeOn(Schedulers.io());
}
4.2 combineLatest()
作用:当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
/**
* 当两个Observables中的任何一个发送了数据后,
* 将先发送了数据的Observables 的最新(最后)一个数据 与
* 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
*/
private void combineLatest() {
Observable.combineLatest(
Observable.just(1L, 2L, 3L, 4L, 5L),
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
(aLong, aLong2) -> {
KLog.d(TTAG, aLong);
KLog.d(TTAG, aLong2);
return aLong+aLong2;
}
).subscribe(aLong -> {
KLog.d(TTAG, aLong);
});
}
事件接收结果是:
3 0
3
3 1
4
3 2
5
4.3 reduce()
作用:把被观察者需要发送的事件聚合成1个事件 & 发送
/**
* 每次用一个方法处理一个值,可以有一个 seed 作为初始值
*/
public static void useReduce() {
Observable.just(1, 2, 3, 4)
.reduce((integer, integer2) -> {
KLog.d(TTAG, integer + "");
KLog.d(TTAG, integer2 + "");
//是所有事件相加
return integer + integer2;
}).subscribe(integer -> KLog.d(TTAG, integer + ""));
}
4.4 collect()
作用:将被观察者
Observable
发送的时间收集到一个数据结构里面
/**
* 将被观察者Observable发送的数据事件收集到一个数据结构里
*/
private void collect() {
Observable.just(1,2,3,4,5,6,7,8)
.collect((Callable<ArrayList<Integer>>) () ->
new ArrayList<>(),
(integers, integer) -> {
integers.add(integer);
}).subscribe(integers ->
KLog.d(TTAG, integers.toString()));
}
5. 发送事件前追加发送事件
5.1 startWIth()以及 startWithArray()
作用: 在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
/**
* 在一个被观察者发送事件前,追加发送一些数据
* 后追加,先调用,组合模式
*/
private void startWith() {
Observable.just(2,3,4,5)
.startWith(0)
.startWith(Observable.just(7,8))
.startWithArray(1)
.subscribe(integer -> {
});
}
6. 统计发送事件数量
6.1 count()
作用:统计被观察者发送事件的数量
/**
* 统计被观察者发送事件的数量
*/
private void count() {
Observable.just(1,2,3,4)
.count()
.subscribe(aLong -> {
KLog.d(TTAG, "发送事件数量是:"+aLong);
});
}
7. 在事件的生命周期中操作
在事件发送以及接收的整个生命周期中进行操作,如发送时间前的初始化,发送事件后的回调请求等
7.1 操作符 do( )
作用:在某个时间的生命周期中调用
类型分为:
-
当Observable每发送一次数据事件就会调用1次
- doOnEach()
- 含onNext()、onError()和onCompleted()
-
Next事件
- 执行Next事件前调用- doOnNext( )
- 执行Next事件后调用- doAfterNext()
-
发送事件完毕后调用
- 发送错误事件后- doOnError()
- 正常发送事件完毕后- doOnCompleted()
- 无论正常发送完毕或者异常终止-doOnTerminate()
- 最后执行: doFinally()
-
订阅相关:
- 观察者订阅时调用-doOnSubscribe()
- 观察者取消订阅时调用- doOnUnsubscribe()
具体代码如下:
/**
* do操作符
*/
private void useDo() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Throwable("发送错误"));
}
//1. 当Observable每发送1次数据事件就会调用1次
}).doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
KLog.d(TTAG, "doOnEach:" + integerNotification);
}
// 2. 执行Next事件前调用
}).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
KLog.d(TTAG, "doOnNext:" + integer);
}
//3.执行Next事件后调用
}).doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
}).doOnComplete(new Action() {
@Override
public void run() throws Exception {
KLog.d(TTAG, "doOnCompleted:");
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
KLog.d(TTAG, "doOnError:" + throwable.getMessage());
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
KLog.d(TTAG, "doOnSubscribe:");
}
}).doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
KLog.d(TTAG, "doAfterTerminate");
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
KLog.d(TTAG, "doFinally");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
KLog.d(TTAG, "开始发射了");
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "接收到事件:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "发生错误了:" + e.getMessage());
}
@Override
public void onComplete() {
KLog.d(TTAG, "处理完成了");
}
});
}
(Main3Activity.java:116)#accept ] doOnSubscribe:
(Main3Activity.java:131)#onSubscribe ] 开始发射了
(Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[1]
(Main3Activity.java:94)#accept ] doOnNext:1
(Main3Activity.java:136)#onNext ] 接收到事件:1
(Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[2]
(Main3Activity.java:94)#accept ] doOnNext:2
(Main3Activity.java:136)#onNext ] 接收到事件:2
(Main3Activity.java:88)#accept ] doOnEach:OnNextNotification[3]
(Main3Activity.java:94)#accept ] doOnNext:3
(Main3Activity.java:136)#onNext ] 接收到事件:3
(Main3Activity.java:88)#accept ] doOnEach:OnErrorNotification[java.lang.Throwable: 发送错误]
(Main3Activity.java:111)#accept ] doOnError:发送错误
(Main3Activity.java:141)#onError ] 发生错误了:发送错误
(Main3Activity.java:126)#run ] doFinally
(Main3Activity.java:121)#run ] doAfterTerminate
8. 关于错误处理:
- onErrorReturn( )
- onErrorResumeNext( )
- onExceptionResumeNext( )
- retry( )
- retryUntil( )
- retryWhen( )
解决方案:
- 发送数据:
发送一个特殊事件&正常终止- onErrorReturn( )
-
发送一个新的Observable,有如下两种:
- onErrorResumeNext( )
- onExceptionResumeNext( )
- 重试:
-直接重试 retry()
让Observable重新订阅 - retryUntil( )
将错误传递给另一个Observable来决定是否要重新订阅改Observable- retryWhen( )
具体使用见下面代码:
8.1 onErrorReturn
/**
* 关于错误的解决方案
*/
private void onErrorReturn() {
/**
* 方案1
* 发送一个特殊书剑,正常结束
*/
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onComplete();
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onError(new Throwable("发生错误了"));
}).onErrorReturn(throwable -> {
KLog.d(TTAG, "在onErrorReturn处理了错误::" + throwable.getMessage());
//发生错误时发送一个事件,正常结束
return 666;
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "接收到事件:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "失败了");
}
@Override
public void onComplete() {
KLog.d(TTAG, "结束了");
}
});
}
8.2 onErrorResumeNext
/**
* 方案2
* 发送新的eObservable
* 两种方式
* onErrorResumeNext( )拦截的错误=Throwable;需要拦截Exception使用下面的方式
* <p>
* onExceptionResumeNext( )如果拦截的错误=Exception,则会发送新的Observable,不会走onerror()方法
* 如果拦截到Throwable错误,会将错误传递给观察者的onError方法,不在发送新的Observable
*/
private void onErrorResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onError(new Throwable("发生错误了呢"));
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
KLog.d(TTAG, "onErrorResumeNext:" + throwable.getMessage());
return Observable.just(7, 3, 6, 8);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "接收到事件:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "失败了");
}
@Override
public void onComplete() {
KLog.d(TTAG, "结束了");
}
});
}
8.3 onExceptionResumeNext
private void onExceptionResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onError(new Exception("发生错误了呢"));
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(11);
observer.onNext(22);
observer.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "接收到事件:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "失败了");
}
@Override
public void onComplete() {
KLog.d(TTAG, "结束了");
}
});
}
8.4 retry
/**
* 当出现错误时,让被观察者(Observable)重新发射数据
* Throwable 和 Exception都可拦截
* <p>
* 1. retry()
* 作用:出现错误时,让被观察者重新发送数据
* 注:若一直错误,则一直重新发送
* <p>
* 2. retry(long time)
* 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
* 参数 = 重试次数
* <p>
* 3. retry(Predicate predicate)
* 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
* 参数 = 判断逻辑
* <p>
* 4. retry(new BiPredicate<Integer, Throwable>)
* 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
* 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
* <p>
* 5. retry(long time,Predicate predicate)
* 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
* 参数 = 设置重试次数 & 判断逻辑
*/
private void retry() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Throwable("发送错误了"));
emitter.onNext(5);
emitter.onNext(6);
}
//遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
}).retry()
//遇到错误时,重试3次
.retry(3)
//拦截错误后,判断是否需要重新发送请求
.retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
KLog.d(TTAG, "错误是:" + throwable.getMessage());
//返回false = 不重新重新发送数据 & 调用观察者的onError结束
//返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
return throwable.getMessage().equals("我是判定错误");
}
//出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 = 判断逻辑(传入当前重试次数 & 异常错误信息)
}).retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
KLog.d(TTAG, "错误是:" + throwable.getMessage());
KLog.d(TTAG, "重试次数是:" + integer);
return true;
}
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
}).retry(3, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
KLog.d(TTAG, "错误是:" + throwable.getMessage());
//返回false = 不重新重新发送数据 & 调用观察者的onError()结束
//返回true = 重新发送请求(最多重新发送3次)
return true;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
8.5 retryUntil
/**
* 出现错误后,判断是否需要重新发送数据
* <p>
* 若需要重新发送 & 持续遇到错误,则持续重试
* 作用类似于retry(Predicate predicate)
* 返回false就一直重试
* 返回true结束
*/
private void retryUntil() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
emitter.onNext(5);
emitter.onNext(8);
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//返回false就一直重试
//返回true结束
return true;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "错误是:" + e.getMessage());
}
@Override
public void onComplete() {
}
});
}
8.6 retryWhen
/**
* 出现错误后,判断是否需要重新发送数据
* <p>
* 若需要重新发送 & 持续遇到错误,则持续重试
* 作用类似于retry(Predicate predicate)
* 返回false就一直重试
* 返回true结束
*/
private void retryUntil() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new Exception("发生错误了"));
emitter.onNext(5);
emitter.onNext(8);
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//返回false就一直重试
//返回true结束
return true;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "错误是:" + e.getMessage());
}
@Override
public void onComplete() {
}
});
}
9. 重复发送操作:
9.1 repeat()
作用:无条件地、重复发送 被观察者事件,具备重载方法,可设置重复创建次数
/**
* 无条件地、重复发送 被观察者事件
*/
private void repeat() {
Observable.just(1, 2, 3, 4)
//设置重复发送次数3次
.repeat(3)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
KLog.d(TTAG, "开始了链接");
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "接收事件是:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "收到错误了");
}
@Override
public void onComplete() {
KLog.d(TTAG, "完成");
}
});
}
9.2 repeartWhen()
作用:有条件地、重复发送 被观察者事件,将原始 Observable 停止发送事件的标识(Complete() / Error()) 转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable
**
* 有条件地、重复发送 被观察者事件
* 将原始 Observable 停止发送事件的标识(Complete() / Error())
* 转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否重新订阅 & 发送原来的 Observable
* <p>
* 返回结果分为两种情况:
* 1.若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
* 2.若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 Observable
*/
private void repeatWhen() {
Observable.just(1,2,3)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
// 在Function函数中,必须对输入的 Observable<Object>进行处理,这里使用的是flatMap操作符接收上游的数据
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//情况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable
// Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
// return Observable.error(new Throwable("不再重新订阅事件"));
// 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
// 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
return Observable.just(1);
// 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
}
});
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
KLog.d(TTAG, "开始连接");
}
@Override
public void onNext(Integer integer) {
KLog.d(TTAG, "收到事件:" + integer);
}
@Override
public void onError(Throwable e) {
KLog.d(TTAG, "收到错误是:"+e.getMessage());
}
@Override
public void onComplete() {
KLog.d(TTAG, "完成");
}
});
}
10. 线程调控
注意:Observable.subscribeOn()可以多次指定,但是只有第一次有效,Observable.observeOn()每次都会生效
类型 | 含义 | 应用场景 |
---|---|---|
Schedulers.immediate() | 当前线程 == 不指定线程 | 默认线程 |
AndroidSchedulers.mainThread() | Android主线程 | 操作UI |
Schedulers.newThread() | 常规新线程 | 进行耗时操作 |
Schedulers.io() | IO操作线程 | 网络请求,读写文件等IO密集型操作 |
Schedulers.computation() | CPU计算操作线程 | 大量计算操作 |
11. 取消订阅
使用Disposable.dispose()
多个Disposable时,可采用RxJava内置容器CompositeDisposable进行统一管理
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add();
compositeDisposable.clear();