在RxJava2(一)教程中,已经跟着大神们学习了RxJava2的基本使用,现在我们来学习一下RxJava2很强大的操作符
Android RxJava2操作符
Map
- Map是RxJava中的一个变换操作符,它的作用就是对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数去变化。通过Map可以将上游发来的事件转换为任意的类型,可以是一个Object也可以是一个集合,图示表示如下:
- 代码表示:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe: ");
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
String mapStr = String.valueOf(integer + 1);
return mapStr;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
FlatMap
flatMap是一个非常强大的操作符,flatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。
- 图示:
上游发送三个事件,分别是1,2,3注意它们的颜色,中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable
上游每发送一个事件,flatMap都将创建一个新的水管,然后发送转换之后的新的事件,下游接收到的就是这些新的水管发送的数据。flatMap不能保证事件的顺序
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).flatMap(new Function<Integer, Observable<String>>() {
@Override
public Observable<String> apply(Integer integer) throws Exception {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
String iStr = "flatMap value" + integer;
arrayList.add(iStr);
}
return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
concatMap
concatMap和flatMap的作用是一样的,它的结果是严格按照上游发送的顺序来发送的。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(11);
e.onNext(111);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
arrayList.add("concatMap value" + i);
}
return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
- 运行结果
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value2
Buffer
Buffer操作符会定期收集Observable的数据放进一个数据包裹,然后发射这些包裹,并不是一次发射一个值
Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据。
Buffer变体
- Buffer(count) 以列表List的形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据
- Buffer(count,skip) 从原始Observable的第一项数据开始创建新的缓存。每当接收到skip数据,用count项数据来填充‘
Scan
Scan连续地对数据序列的每一项应用一个函数,然后连续发射结果
Scan操作符对原始Observable发射的第一项数据应用一个函数,然后将这个函数的结果作为自己的第一项数据发射。将函数的结果同第二项数据一起填充给这个函数来产生自己的第二项数据。持续进行这个过程来产生剩余的数据序列。
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Window
Window定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口而不是每次发射一项数据
window和Buffer类似,但不是发射来自原始Observable的数据包,发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onComplete通知。
Observable.range(1, 10).window(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
Log.d(TAG, "subscribeActual: ");
observer.onNext(1);
observer.onNext(1);
observer.onNext(1);
}
}).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
});
ZIP操作符
ZIP通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件。按照严格的顺序应用这个函数,只发射与发射项最少的那个Observable一样多的数据
从图中看到,有两个上游的水管,通过ZIP操作符,使得两个事件合并为了一个事件
- 分解动作
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
//上游水管第一个事件
Observable<Integer> observable1 = Observable.range(1, 5);
//上游水管第二个事件
Observable<Integer> observable2 = Observable.range(6, 10);
//合并事件
Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer integer, Integer integer2) throws Exception {
return String.valueOf(integer + integer2);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
实践
- zip在Android中的使用,可以适用于如下场景,一个界面需要展示用户的一些信息,这些信息分别要从两个服务器接口中获取,只有当两个数据都获取后才能进行展示。这类同时的信息请求比较适用zip
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
zip打包
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
基本的操作符就是这些了,以后再学习到其它的运算符再继续补充