前言
这一节主要讲解Zip操作符的用法
1. Zip操作符
Zip是通过一个方法将多个上游(多个水管、多个Observable)发射的事件结合到一起,然后发射这个组合事件,是严格按照顺序的,它只发射与发射数据量最少的上游(最少的水管、最少的Observable)一样多的数据。
如下图所示:
由上图可知:
上游有两根水管,一根用于发射圆形事件,另一根用于发射三角形事件,通过Zip操作符,把圆形事件和三角形事件合并为一个矩形事件
分解动作如下:
由分解图可知:
- 组合过程是分别从两根水管取出各取出一个事件,且一个事件只能被使用一次,且严格按照事件发送顺序进行,也就是说不会出现圆形1、三角形B合并,也不会出现圆形2、三角形A合并;
- 最终下游收到事件的数量 和上游 发送事件最少的那一根水管的事件数量相同。可以这样理解:因为是从每一根水管中取事件进行合并,最少的那根肯定先取完,这个时候,尽管其他水管尽管还有事件,但是没有其他事件与之进行合并,因此下游不会收到剩余的事件。
示例代码如下:
/**
* zip操作符:
* 组合过程是分别从两根水管中各取一个事件进行组合,并且一个事件只能被使用一次,且严格按照事件发送的顺序,
* 也就是说不会出现1与B、2与A进行组合
*/
public static void demo1(){
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
Thread.sleep(1000);
Log.e("TAG" , "emit 2") ;
emitter.onNext(2);
Thread.sleep(1000);
Log.e("TAG" , "emit 3") ;
emitter.onNext(3);
Thread.sleep(1000);
Log.e("TAG" , "emit 4") ;
emitter.onNext(4);
Thread.sleep(1000);
Log.e("TAG" , "emit complete1") ;
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) ; // 让上游1(第一个水管)在子线程中执行
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e("TAG" , "emit A") ;
emitter.onNext("A");
Thread.sleep(1000);
Log.e("TAG" , "emit B") ;
emitter.onNext("B");
Thread.sleep(1000);
Log.e("TAG" , "emit C") ;
emitter.onNext("C");
Thread.sleep(1000);
Log.e("TAG" , "emit complete2") ;
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()) ; // 让上游2(第二个水管)在子线程中执行
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG" , "subscribe") ;
}
@Override
public void onNext(String value) {
Log.e("TAG" , "next -> " + value) ;
}
@Override
public void onError(Throwable e) {
Log.e("TAG" , "error") ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
});
}
2. 具体应用场景
比如一个界面需要展示用户信息,而该用户信息必须从两个接口中获取,只有当这两个接口都调用成功后,才可以获取用户信息进行展示,这个时候就可以使用zip操作符;
/**
* zip具体示例实践
*/
public static void pritice(){
final Api api = RetrofitProvider.get().create(Api.class) ;
// 创建上游1,在子线程中请求用户基本信息接口
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest())
.subscribeOn(Schedulers.io()) ;
// 创建上游2:在子线程中请求用户额外信息接口
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest())
.subscribeOn(Schedulers.io()) ;
// 使用 zip操作符,进行组合事件 把 UserBaseInfoResponse、UserExtraInfoResponse组合成 UserInfo
Observable.zip(observable1, observable2, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse userBaseInfoResponse, UserExtraInfoResponse userExtraInfoResponse) throws Exception {
return new UserInfo(userBaseInfoResponse , userExtraInfoResponse);
}
}).observeOn(AndroidSchedulers.mainThread()) // 切换到主线程中,进行UI更新
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
// 在这两个接口请求成功后,然后切换到主线程中,进行UI更新
}
});
}