创建类 ##转换类 ##过滤类##时间类##截取类。
create
from fromArray(数组) fromIterator(集合)
just(1,2,3...)("1","2","3"...)
补充:
compose 去除重复代码。
onTerminateDetach()防止内存泄漏。
关闭轮询timeUntil(Observable.timer(delay long,TimeUnit.xxx))
1.map 变换
//1.map,变换,讲事件序列中的事件转换为另外一个事件。
private void map(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1111);
e.onNext(2222);
e.onNext(3333);
}
}).map(new Function<Integer,String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is a map demo,id= "+integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
2.zip 合并 可变参数,和最短的相同。
//2.zip 合并。
private static void zip(){
Observable.zip(getIntegerObservable(), getStringObservable(), new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer+s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
private static Observable<Integer> getIntegerObservable(){
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1000001);
e.onNext(1000002);
e.onNext(1000003);
e.onNext(1000004);
}
});
}
private static Observable<String> getStringObservable(){
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("第一个String");
e.onNext("第二个String");
e.onNext("第三个String");
}
});
}
3.concat 连接
private static void concat(){
Observable.concat(Observable.just(1,2,3,4),Observable.just(6,7,8))
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "this is concat demo,id = "+integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
4.flatmap flat:平摊。将observable分摊成多个observable,再装入一个observable中,激活这个observable。
他是无序的
private static void flatmap(){
String[] strs = {"尖子生","语文","英语","数学"};
String[] strs2 = {"体育生","长跑","短跑","跳远","田径"};
String[] strs3 = {"学渣","睡觉"};
String[] strs4 = {"学神","睡觉","学习","玩"};
Student[] students = {new Student("小明",15,strs),new Student("仍物线",40,strs2)
,new Student("冬冬",26,strs3),new Student("宝贝",5,strs4)};
Observable.fromArray(students).flatMap(new Function<Student, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Student student) throws Exception {
return Observable.fromArray(student.getSource());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.print(s);
}
});
}
5.concatmap。功能与flatmap相同,唯一不同是保证了顺序。代码就不贴了。
6distinct 过滤 过滤重复,底层hashSet(),没有调用onNext,有不添加不调用。
//distinct 过滤
private static void distinct(){
Observable.just("a","b","c","d","a","e","d","d")
.distinct()
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.print(s);
}
});
}
//abcde
7.filter 过滤 自定义规则。
//过滤
private static void filter(){
Observable.just(1,2,3,4,12,3,4123,12,3423,1).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>=5;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer+"");
}
});
}
//结果
12
4123
12
3423
8.buffer buffer(count,skip),一次事件中有count个参数,按照skip进行选区。
private static void buffer(){
Observable.just(1,2,3,4,5,6)
.buffer(3,1)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println(integers.size());
for(Integer size:integers){
System.out.print(String.valueOf(size));
}
}
});
}
//打印结果。
3
1233
2343
3453
4562
561
6
9.timer 计时器 默认就是在新的线程操作,在android记得切回来,会自己创建一个Observable
private static void timer(){
Observable.timer(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
long time = date.getTime();
System.out.println(String.valueOf(time));
}
});
}
interval 创建一个Observable,延迟多久后发送消息,每隔几秒发送一次。
注意:比如在activity中做,和handler一样,页面销毁了,他还没停止,记得手动关闭,
private static void interval(){
Disposable subscribe = Observable.interval(2,2,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});
}
if(subscribe!= null &&subscribe.isDisposed()){
subscribe.dispose();
}
doOnNext 在发射后consumer消费之前做的事。传入Consumer()。不能对数据修改。
skip,传入count,跳过count个事件。
take,传入count,只接受count个事件。
Single。类似observable类。接受一个参数 .subscribe(singleObserver) singleObserver只会调用
onError或onComplete
private static void single(){
Single.just(1)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
}
debounce 去抖,一个事件规定时间内有第二次传入,第一次取消,执行第二次。可以做搜索,防止button短时间点击多次。
和sample的区别,debounce是针对一个一个的事件,sample针对的一个一个的时间段。
private static void debounce(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("11111");
Thread.sleep(300);
e.onNext("22222");
Thread.sleep(502);
e.onNext("33333");
Thread.sleep(502);
e.onNext("44444");
Thread.sleep(499);
}
}).debounce(500,TimeUnit.MILLISECONDS)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
defer 创建类操作符,只有被订阅才创建observable。有一个延时效果在里面。
public static void defer(){
Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> call() throws Exception {
return Observable.just(1, 2, 3);
}
});
defer.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer o) throws Exception {
System.out.println(o+"");
}
});
}
1
2
3
last 只发出最后一个,或者符合条件的最后一个
merge 和concat类似,不过是不按顺序的。
reduce 操作事件,统一返回一个结果。
public static void reduce(){
Observable.just(1,2,3,5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer-integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer+"");
}
});
}
-9
scan 与reduce类似,不过每次都会调用onNext 把每一步的结果返回。
window 将一个事件序列,按照参数分成若干个事件序列。
mRxOperatorsText.append("window\n");
Log.e(TAG, "window\n");
Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
.take(15) // 最多接收15个
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Observable<Long>>() {
@Override
public void accept(@NonNull Observable<Long> longObservable) throws Exception {
mRxOperatorsText.append("Sub Divide begin...\n");
Log.e(TAG, "Sub Divide begin...\n");
longObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
mRxOperatorsText.append("Next:" + aLong + "\n");
Log.e(TAG, "Next:" + aLong + "\n");
}
});
}
});
repeat 发送多少次
public static void repeat(){
Observable.range(5,10).repeat(20)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer).intern();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
//从5开始发送到14 重复20次。
startWith 事件前加入指定事件
public static void startWith(){
Observable.range(5,5).startWith(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer+"");
}
});
}
056789
delay 延迟几秒发射。#timeout,超时后发送自己的。
do系列
doOnNext()
、doAfterNext()
、doFinally()completed或者error时触发
,doOnSubscribe()在订阅时触发,一般用来初始化。
错误操作符
catch
onErrorReturn 在错误触发时,返回一个特殊的项替代错误,不会传递给观察者。
onErrorResumeNext 发生错误发送备用observable给观察者。
public static void catch1(){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onError(null);
e.onNext("111");
}
}).onErrorReturn(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) throws Exception {
return "发生错误了,而你不知道";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.print(s);
}
});
}
发生错误了,而你不知道
#retry(long time) 重试 指定次数。
#toFlowable 将observable转成Flowable