RxJava操作符系列传送门
RxJava操作符源码
RxJava操作符系列一
RxJava操作符系列二
RxJava操作符系列三
RxJava操作符系列四
RxJava操作符系列五
前言
在上几篇文章我们介绍了一些RxJava创建,转换,过滤,组合,辅助的一些操作符,今天我们继续去学习RxJava的一些条件和布尔操作符,在此感谢那些阅读文章并提出建议的同学,其实写技术文章主要是提升自己的知识面以及对知识的理解程度,也有人说有些地方描述还不够清晰,文笔有待提高,是的,确实是这样,描述不清晰我感觉不仅是文笔有待提高(提升空间很大),对技术的理解也需要提高(技术渣蜕变中...)。不过我相信,只要我能写出来,这就是一种进步,就是一个积累的过程,我会努力,加油。好了,不扯了,进入正题。
All
该操作符是判断Observable发射的所有数据是否都满足某一条件,它接收一个Fun1方法参数,此方法接收原始数据,并返回一个布尔类型值,如果原始Observable正常终止并且每一项数据都满足条件,就返回true;如果原始Observable的任何一项数据不满足条件就返回False。当某数据不满足条件(返回false)时之后的数据不再发射。如下示例代码
Observable.just(1,2,3,4).all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
Log.e(TAG, "call: "+integer );
return integer<2;
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Boolean aBoolean) {
Log.e(TAG, "onNext: "+aBoolean );
}
});
输出日志信息
call: 1
call: 2
onNext: false
onCompleted:
Amb
当我们传递多个Observable(可接收2到9个Observable)给amb时,它只发射其中一个Observable的数据和通知:首先发送通知给amb的那个,不管发射的是一项数据还是一个onError或onCompleted通知。amb将忽略和丢弃其它所有Observables的发射物。
Observable observable= Observable.just(1,2,3).delay(500,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
Observable observable1= Observable.just(4,5,6).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
Observable.amb(observable,observable1).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, " onNext: "+integer);
}
});
输出日志信息
onNext: 4
onNext: 5
onNext: 6
onCompleted:
有一个类似的对象方法ambWith。如上示例中Observable.amb(observable,observable1)改为observable.ambWith(observable1)是等价的。
Contains
我们可以给Contains传一个指定的值,如果原始Observable发射了那个值,它返回的Observable将发射true,否则发射false,即Observable发射的数据是否包含某一对象。
示例代码
Observable.just(1,2,3,4).contains(2)
.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Boolean aBoolean) {
Log.e(TAG, "onNext: "+aBoolean);
}
});
输出日志信息
onNext: true
onCompleted:
isEmpty也是一种判断是否包含的操作符,不同的是它判断原始Observable是否没有发射任何数据。
exists操作符与contains操作符作用相同,但是它接收的是一个Func1函数,可以指定一个判断条件,当发射的数据有满足判断条件(返回true)就发射true,否则为false。
DefaultIfEmpty
该操作符简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompletedd的形式),DefaultIfEmpty返回的Observable就发射一个我们提供的默认值。
Observable.empty().defaultIfEmpty(1).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Object object) {
Log.e(TAG, "onNext: "+object);
}
});
输出日志信息
onNext: 1
onCompleted:
defaultIfEmpty只有在没有发射数据时才会有效果,若发射的有数据,和不使用此操作符效果一样。
SequenceEqual
该操作符会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false
Observable observable=Observable.just(1,2,3);
Observable observable1=Observable.just(1,3,2);
Observable.sequenceEqual(observable,observable1)
.subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Boolean aBoolean) {
Log.e(TAG, "onNext: " +aBoolean);
}
});
执行后输出
onNext: false
onCompleted:
SkipUntil
SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。
Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
Observable observable1 = Observable.just(20, 21, 22).delay(130, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
observable.skipUntil(observable1)
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: "+o );
}
});
输出日志信息
onNext: 4
onNext: 5
onNext: 6
onCompleted:
由于上面两个Observable都是在一个新线程中,时间不可控,每次运行的结果一般不会相同。但是都会符合上面所述规则,可参考上图。
SkipWhile
该操作符也是忽略它的发射物,直到我们指定的某个条件变为false的那一刻,它开始发射原始Observable。切记是判断条件返回false时开始发射数据。
示例代码
Observable.range(1,5).skipWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
Log.e(TAG, "call: "+integer);
return integer<3;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " +integer);
}
});
输出日志信息
call: 1
call: 2
call: 3
onNext: 3
onNext: 4
onNext: 5
onCompleted:
TakeUntil
如果我们理解了SkipUntil操作符了,那么这个操作符也就很好理解了,该操作符与SkipUntil有点相反的意思。通过上图你应该也看出来和SkipUntil的区别,当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据(SkipUntil则是丢弃之前数据,发射之后的数据)。
Observable observable = Observable.just(1, 2, 3,4,5,6).delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
Observable observable1 = Observable.just(20, 21, 22).delay(120, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread());
observable.takeUntil(observable1)
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: "+o );
}
});
输出日志信息(每次执行的输出结果一般不相同)
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onCompleted:
TakeWhile
该操作符发射原始Observable,直到我们指定的某个条件不成立的那一刻,它停止发射原始Observable(skipWhile此时开始发射),并终止自己的Observable。
Observable.range(1,5).takeWhile(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
Log.e(TAG, "call: "+integer);
return integer<3;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " +integer);
}
});
输出日志信息
call: 1
onNext: 1
call: 2
onNext: 2
call: 3
onCompleted:
今天的这篇文章到这里就结束了,若文中有错误的地方,欢迎指正。谢谢。