过滤型操作符
filter
过滤符
// 上游
Observable.just("三鹿", "合生元", "飞鹤")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
// return true; // 不去过滤,默认全部都会打印
// return false; // 如果是false 就全部都不会打印
// 过滤掉 哪些不合格的奶粉,输出哪些合格的奶粉
if ("三鹿".equals(s)) {
return false; // 不合格
}
return true;
}
})
// 订阅
.subscribe(new Consumer<String>() { // 下游
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
take
过滤操作符
Observable.interval(2, TimeUnit.SECONDS)
// 增加过滤操作符,停止定时器
.take(8) // 执行次数达到8 停止下来
.subscribe(new Consumer<Long>() { // 下游
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: " + aLong);
}
});
Observable.interval(2, TimeUnit.SECONDS)
是定时器,take
用于过滤执行次数,一般配合定时器一起使用。
distinct
过滤重复数据操作符
// 上游
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onNext(4);
e.onComplete();
}
})
.distinct() // 过滤重复 发射的事件
.subscribe(new Consumer<Integer>() { // 下游 观察者
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer); // 事件不重复
}
});
elementAt
输出指定下标过滤操作符
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("张三");
emitter.onNext("李四");
emitter.onNext("王五");
emitter.onComplete();
}
}).elementAt(4,"rc") //指定下游只接收上游的第四个发出的值,如果没有的话,用默认值“rc”代替
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("elementAt", "" + s);
}
});