学习资料
1.过滤操作符
作用:对Observable发射的 数据序列 进行 过滤或选择
1.1 first
只发射第一个或者第一个满足某个条件的数据项
1.1.1 first()第一项
简单使用:
public class FirstDemo {
public static void main(String[] args) {
first();
}
/**
* 只发送第一项数据
*/
private static void first() {
Observable
.just(1,2,3,4)
.first()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
}
}
运行结果:
1
1.1.2 first(Func1)
只发送第一个满足条件的数据
简单使用:
/**
* 第一个偶数
*/
private static void firstTrue() {
Observable
.just(1,3,4,5)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe((i)->System.out.println("第一个偶数:" + i));
}
运行结果:
第一个偶数:4
last
正好与first
相反,是只发送最后一个或者最后一个满足条件的数据项
1.2 take
只发送前N项数据
1.2.1 take(int)
只发送前int
项数据项,默认不任何特定的调度器上执行
简单使用:
/**
* 只发送前5项数据
*/
private static void takeInt() {
Observable
.interval(500, TimeUnit.MILLISECONDS, Schedulers.immediate())
.take(5)
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
0 ,1 ,2 ,3 ,4 ,
1.2.2 take(long,TimeUnit,Scheduler)
在写定的时间段内,会发送Observable
发出的数据项,默认在computation
运算调度器上执行
简单使用:
/**
* 发送1s以内的数据项
*/
private static void takeTime() {
Observable
.interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.take(1,TimeUnit.SECONDS ,Schedulers.newThread())
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
0 ,1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,
发送的数据项只是1s以内的,并不包括1s时的
takeLast
是只发送后n
项数据
1.3 Skip
跳过数据项
1.3.1 skip(int)
跳过Observable
发送的前n项数据项,默认不在任何特定的调度器上执行
简单使用:
/**
* 跳过前3项数据
*/
private static void skipInt() {
Observable
.range(1,10)
.skip(3)
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
4 ,5 ,6 ,7 ,8 ,9 ,10 ,
1.3.2 skip(long,TimeUnit,Scheduler)
跳过给定的时间段内Obsvable
发送过来的数据项,默认在computation
运算调度器上执行
简单使用:
/**
* 发送前5个,500毫秒之后的数据项
*/
private static void skipTime() {
Observable
.interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
.skip(500,TimeUnit.MILLISECONDS)
.take(5)
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
4 ,5 ,6 ,7 ,8 ,
SkipLast
就是跳过后n个数据项
1.4 Sample
定期发射Observable
最近发射的数据项
1.4.1 sample(long,TimeUnit)
定时查看一个Observable
,然后将自上次采样后,Observable
最近一次发送的数据发送出去,默认在默认在computation
调度器上执行
注意:如果从上次采样后,原始的Observable
没有发出数据项,sample
操作返回的新的Observable
在监测期时间内也不会发射任何数据
简单使用:
/**
* 每隔100毫秒,将Observable最近一个发送的数据项发送出去
*/
private static void sampleTime() {
Observable
.interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
.sample(100, TimeUnit.MILLISECONDS)
.take(5)
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
1 ,5 ,8 ,11 ,15 ,
1.4.2 sample(Observable)
- sample(signal)
当监测到名字为signal
的Obaervable
发过来一个信号或者终止时,就对原始Observable
发送的数据进行采样,然后将自从上次采样以来最近一次发送的数据发送出去
默认不在任何特定的调度器上执行
简单使用:
/**
* 每当收到信号时,将最近发送的一个数据项发送出去
*/
private static void sameSignal() {
Observable
.interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
.sample(Observable.interval(100,TimeUnit.MILLISECONDS))
.take(5)
.subscribe((i) -> System.out.print(i + " ,"));
}
运行结果:
2 ,5 ,8 ,12 ,15 ,
第一个输出的数字是2,因为是从0开始的,每次输出的数字中间都会间隔2个
1.5 Debounce
两次发送数据项间隔大于一段指定的时间,才发射一个数据
注意:最后的onCompleted
信号,会紧随着最后一项原始Observable
数据项,即使是小于时间间隔,一旦结束到onCompleted
信号,整个操作也就结束了,onCompleted
通知不会触发限流
1.5.1 debounce(long,TimeUnit)
在指定的时间long
间隔进行限流,个人理解,过滤两次数据小于指定间隔的数据项,与上次发送的时间差大于间隔的数据项才进行发送
默认在computation
调度器上执行
简单使用:
/**
* 输出两次间隔大于150秒的数据项
*/
private static void deBounceTime() {
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
for (int i = 0; i < 5; i++) {
//产生在100到200间随机时间间隔
TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
subscriber.onNext(i);
}
//延迟结束信号 否则最后一次一定不会发送
TimeUnit.MILLISECONDS.sleep(100);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
})
.debounce(150, TimeUnit.MILLISECONDS, Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println(" --> onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(Integer integer) {
System.out.print(integer + " ,");
}
});
}
运行结果:
2 ,4 , --> onCompleted
注意subscriber.onCompleted()
发送结束通知信号的时机
1.5.2 debounce(Func1)
对原始Observable
的每一个数据项应用一个函数进行限流,这个函数返回一个Observable
。接到通知前,原始Observable发送的数据项将会被抑制
默认不在任何特定的调度器上执行
简单使用:
/**
* 在没有接到通知的150毫秒内,原始Observable发送的数据项将会被抑制
*/
private static void deBounceSignal() {
Observable
.create((subscriber) -> {
try {
for (int i = 0; i < 5; i++) {
//产生在100到200间随机时间间隔
TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
subscriber.onNext(i);
}
//延迟结束信号 否则最后一次一定不会发送
TimeUnit.MILLISECONDS.sleep(100);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
})
.debounce(new Func1<Object, Observable<Long>>() {
@Override
public Observable<Long> call(Object o) {
//每隔150毫秒发出一个通知
return Observable.interval(150, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println(" --> onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(Object o) {
System.out.print(o + " ,");
}
});
}
运行结果:
0 ,2 ,4 , --> onCompleted
注意:最后一个数据项一定会被发送,即使在没有接到通知的150毫秒内
1.6 Distinct
过滤掉重复的数据项,默认不在任何特定的调度器上执行
1.6.1 distinct()
简单使用:
/**
* 去除重复项
*/
private static void distinct() {
Observable
.just(1,2,2,3,4,4,5,6,6)
.distinct()
.subscribe((i) -> System.out.print(i+" ,"));
}
运行结果:
1 ,2 ,3 ,4 ,5 ,6 ,
1.6.2 distinct(Func1)
将原始Observable
发送的数据项应用一个函数,根据这个函数产生不同的key
,之后的数据项便是比较key
,而不再管数据项
简单使用:
/**
* 根据条件指定过滤的key ,将之后出现 key为"1","2",全部过滤
*/
private static void distinctKey() {
Observable
.just(1,2,2,3,3,4,5,6,6)
.distinct(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
//设置key
return integer / 2 == 0 ? "1" : "2" ;
}
})
.subscribe((i) -> System.out.print(i+" ,"));
}
运行结果:
1 ,2 ,
1.7 ElementAt
只发射索引值为N的数据项,索引从0开始
如果传递的索引为负数,或者索引不小于数据项个数,将会抛出一个IndexOutOfBoundsException
异常
简单使用:
public class ElementAtDemo {
/**
* 输出索引值为5的数据项,从0开始
*/
public static void main(String[] args) {
Observable
.range(1,10)
.elementAt(5)
.subscribe(System.out::println);
}
}
运行结果:
6
1.8 IgnoreElements
不发射任何数据,只发射Observable的终止通知onError
或onCompleted
若不关心Obsvable
发送的数据项,只想在完成时,或者遇到错误终止时收到通知,可以使用,这个操作符永远不会调用观察者的onNext()
方法
默认不在任何特定的调度器上执行
2. 最后
感觉过滤操作符比变换操作符理解起来要容易一些
本人很菜,有错误请指出
共鸣 :)