常用辅助操作符
delay、do系列、subscribeOn、observeOn、timeout等
实例与功能介绍
1. delay操作符
代码实例
Observable observable = Observable.just(1, 2, 3);
observable = observable.delay(5, TimeUnit.SECONDS);
功能:可以让源Observable在发送每项数据之前都暂停一段指定的时间段。
2. do系列操作符
代码实例
Observable observable = Observable.just(1, 2, 3);
observable = observable.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
LogShowUtil.addLog("RxJava","开始执行: "+integer.intValue(),true);
}
});
结果
开始执行: 1
结果: 1
开始执行: 2
结果: 2
开始执行: 3
结果: 3
功能:Do系列操作符就是为原始的Observable的生命周期注册一个回调,当Observable的某个时间发生时就会调用这些回调。
doOnEach:
Observable调用onNext()、onError()和onCompleted()时都会回调这个方法doOnNext:
只在执行onNext()时会回调doOnSubscribe:
订阅时会回调doOnUnsubscribe:
取消订阅时会回调doOnCompleted:
Observable正常终止时会回调doOnError:
Observable异常终止时会回调doOnTerminate:
Observable终止(包含正常和异常终止)之前时会回调finallyDo:
Observable终止(包含正常和异常终止)之后时会回调
3. subscribeOn操作符
代码实例
Observable observable = Observable.just(1, 2, 3);
observable = observable.subscribeOn(Schedulers.newThread());
功能:指定Observable在哪个线程上执行
4. observeOn操作符
代码实例
Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
LogShowUtil.addLog("RxJava","发送线程: "+Thread.currentThread().getName(),true);
subscriber.onNext("杨");
subscriber.onCompleted();
}
});
Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
Observable Cobservable = Bobservable.observeOn(AndroidSchedulers.mainThread());
功能:指定Observer在哪个线程上执行
5. timeout操作符
代码实例 1
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0; i<10;i++){
try {
Thread.sleep(i*100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread());
observable = observable.timeout(200,TimeUnit.MILLISECONDS);
结果
结果: 0
结果: 1
结果: 2
异常
代码实例 2
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=0; i<10;i++){
try {
Thread.sleep(i*100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread());
observable = observable.timeout(200,TimeUnit.MILLISECONDS,Observable.just(10,11));
结果
结果: 0
结果: 1
结果: 2
结果: 11
结果: 12
功能:设置Observable过了指定的一段时间仍然没有发送任何数据就异常跳转。实例1是直接抛出异常,实例2是跳转执行其他Observable。