本页展示的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。
8.1 Concat
不交错的发射两个或多个 Observable 的发射物。
Concat 操作符连接多个 Observable 的输出,就好像它们是一个 Observable,第一个 Observable 发射的所有数据在第二个 Observable 发射的任何数据前面,以此类推。
直到前面一个 Observable 终止,Concat 才会订阅额外的一个 Observable。注意:因此,如果你尝试连接一个"热" Observable(这种 Observable 在创建后立即开始发射数据,即使没有订阅者),Concat 将不会看到也不会发射它之前发射的任何数据。
在 Rxjava2.0 中实现了多种 Concat 的操作符。
8.1.1 Concat / ConcatWith
顺序连接多个 Observables,并且严格按照发射顺序,前一个没有发射完,是不能发射后面的。
两者等价
Observable.concat(ob1,ob2);
ob1.concatWith(ob2)。
示例代码:
Observable<Integer> ob1 = Observable.just(10,1,11);
Observable<Integer> ob2 = Observable.just(3, 8);
Observable<Integer> ob3 = Observable.just(5, 4);
Observable.concat(ob1, ob2,ob3).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
accept:10
accept:1
accept:11
accept:3
accept:8
accept:5
accept:4
8.1.2 concatArray
连接可变数量的 Observable 源。
concat 操作符内部其实是调用 ConcatArray 的方法。由于与concat(ObservableSource)的重载冲突,以这种方式命名。
示例代码:
Observable<Integer> ob1 = Observable.just(10,1,11);
Observable<Integer> ob2 = Observable.just(3, 8);
Observable<Integer> ob3 = Observable.just(5, 4);
Observable.concatArray(ob1, ob2, ob3).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
accept:10
accept:1
accept:11
accept:3
accept:8
accept:5
accept:4
8.1.2 concatArrayDelayError / ConcatDelayError
顺序连接多个 Observables,并且严格按照发射顺序,如果其中有发送 OnError(),此延迟其发送 onError (),直到所有发射结束 ;
两者等价
Observable.concatDelayError(Observable.fromArray(ob1,ob2,ob3));
Observable.concatArrayDelayError(ob1, ob2, ob3);
示例代码:
Observable<Integer> ob1 = Observable.just(1, 11);
Observable<Integer> ob2 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(3);
emitter.onError(new Throwable("throwable 2"));
emitter.onNext(8);
}
});
Observable<Integer> ob3 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(5);
emitter.onError(new Throwable("throwable 3"));
emitter.onComplete();
}
});
Observable.concatArrayDelayError(ob1, ob2, ob3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG, "onNext:"+integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
});
输出结果:
onNext:1
onNext:11
onNext:3
onNext:5
onError:2 exceptions occurred.
8.1.3 ConcatArrayEager / ConcatEager
将 Observable 序列紧紧连接到单个值流中。该级联一旦订阅,此运营商订阅所有源 Observable。 操作员缓冲这些 Observable 发出的值,然后依次排列它们,每一个都在前一个完成之后。
//两者等价
Observable.ConcatEager(Observable.fromArray(ob1,ob2,ob3));
Observable.ConcatArrayEager(ob1, ob2, ob3);
8.2 Reduce
按顺序对 Observable 发射的每项数据应用一个函数并发射最终的值。
在 Rxjava2.0 中实现了多种 Reduce 的操作符:
8.2.1 collect / collectInto
将源 Observable 发送的项目收集到单个可变数据结构中,并返回发出此结构的 Single。
这是一个简化版本的 reduce,不需要在每次通过时返回状态。
//两者等价
collectInto("beauty",collector);
collect(Functions.justCallable("beauty"), collector);
示例代码 1:
Observable.just(2,7,11).collect(new Callable<String>() {
@Override
public String call() throws Exception {
return "beauty";
}
}, new BiConsumer<String, Integer>() {
@Override
public void accept(String s, Integer i2) throws Exception {
Log.e(TAG, "i1 = " + s + ",i2 = " + i2);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
示例代码 2:
Observable.just(2,7,11).collectInto("beauty", new BiConsumer<String, Integer>() {
@Override
public void accept(String s, Integer i2) throws Exception {
Log.e(TAG, "i1 = " + s + ",i2 = " + i2);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
输出结果:
i1 = beauty,i2 = 2
i1 = beauty,i2 = 7
i1 = beauty,i2 = 11
accept:beauty
8.2.2 reduce
Reduce 操作符对原始 Observable 发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程直到原始 Observable 发射它的最后一项数据并终止,此时 Reduce 返回的 Observable 发射这个函数返回的最终值。
注意:如果原始 Observable 没有发射任何数据,reduce 抛出异常 IllegalArgumentException。
示例代码 1:
Observable.just(2,7,11).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer i1, @NonNull Integer i2) throws Exception {
//i1 为前面几项计算得,i2 为当前发射的数据
Log.e(TAG, "i1 = " + i1 + ",i2 = " + i2);
return i1 * i2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
i1 = 2,i2 = 7
i1 = 14,i2 = 11
accept:154
示例代码 2:
Observable.just(2, 7, 11).reduce(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer i1, @NonNull Integer i2) throws Exception {
//i1 为前面几项计算得,i2 为当前发射的数据
Log.e(TAG, "i1 = " + i1 + ",i2 = " + i2);
return i1 * i2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept:" + integer);
}
});
输出结果:
i1 = 10,i2 = 2
i1 = 20,i2 = 7
i1 = 140,i2 = 11
accept:1540
8.2.3 reduceWith
类似于 reduce(); 但可以聚合不同种类的数据。
示例代码:
Observable.just(2,7,11).reduceWith(new Callable<String>() {
@Override
public String call() throws Exception {
return "beauty";
}
}, new BiFunction<String, Integer, String>() {
@Override
public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
Log.e(TAG, "sep1:" + s + " ,sep2 = " + integer);
return s + " - " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept:" + s);
}
});
输出结果:
sep1:beauty ,sep2 = 2
sep1:beauty - 2 ,sep2 = 7
sep1:beauty - 2 - 7 ,sep2 = 11
accept:beauty - 2 - 7 - 11