常用过滤操作符
filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst、throttleWithTimeOut等
实例与功能介绍
1. filter操作符
代码实例
Observable observable = Observable.just(1,2,3,4);
observable = observable.filter(new Func1<Integer,Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
});
结果
结果: 3
结果: 4
结束
功能:对observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者。
2. elementAt操作符
代码实例
Observable observable = Observable.just(1,2,3,4);
observable = observable.elementAt(2);
结果
结果: 3
结束
功能:用来返回指定位置的数据
3. distinct操作符
代码实例
Observable observable = Observable.just(1,2,2,3,3,4);
observable = observable.distinct();
结果
结果: 1
结果: 2
结果: 3
结果: 4
结束
功能:去重,只允许还没有发送过的数据项通过。
4. skip操作符
代码实例
Observable observable = Observable.just(1,2,2,3,3,4);
observable = observable.skip(3);
结果
结果: 3
结果: 3
结果: 4
结束
功能:将observable发送的数据过滤掉前n项
5. take操作符
代码实例
Observable observable = Observable.just(1,2,2,3,3,4);
observable = observable.take(3);
结果
结果: 1
结果: 2
结果: 2
结束
功能: o将observable发送的数据只取前n项
6. ignoreElements操作符
代码实例
Observable observable = Observable.just(1,2,2,3,3,4);
observable = observable.ignoreElements();
结果
结束
功能:忽略所有observable发送的数据项
7. throttleFirst操作符
代码实例
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++){
subscriber.onNext(i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
});
observable = observable.throttleFirst(200, TimeUnit.MILLISECONDS);
结果
结果: 0
结果: 2
结果: 4
结果: 6
结果: 8
结束
功能:定期发射throttleFirst设置的时间段内源Observable发送的第一个数据。
8. throttleWithTimeOut操作符
代码实例
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++){
subscriber.onNext(i);
int sleep = 100;
if (i%3 == 0){
sleep = 300;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
});
observable = observable.throttleWithTimeout(250, TimeUnit.MILLISECONDS);
结果
结果: 0
结果: 3
结果: 6
结果: 9
结束
功能:源Observable每次发送出一个数据后就会进行计时,如果在设定好的时间结束前,有新的数据发送,这个数据就会被丢弃;同时,throttleWithTimeOut重新开始计时。