concat/concatDelayError/concatArray/concatArrayDelayError
将多个被观察者按先后顺序串联起来。
// 前面有 2-4 个 ObservableSource 参数,内部调用的都是 concatArray
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
// 数组和集合,意思一样
public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
Observable.concat(observableStr, observableInt, observableBoolean)
.subscribe(Consumer<Any> {
textView.text = "${textView.text}\nonNext $it"
})
将依次连续发送 observableStr,observableInt,observableBoolean 里的 12 个数据。
// 区别在于有个参数 prefetch,和 combineLatest 的 buffersize 作用一样
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch)
这两个重载方法,不知道怎么用,试验了若干次,终于不崩溃了。
Java 语言版本:
Observable observable = Observable.create(new ObservableOnSubscribe<ObservableSource>() {
@Override
public void subscribe(ObservableEmitter emitter) {
emitter.onNext(observableInt);
emitter.onNext(observableStr);
}
});
Observable.concat(observable).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) {
Log.e("RX", o.toString());
}
});
Kotlin 语言版本
val observable = Observable.create<ObservableSource<*>> { emitter ->
emitter.onNext(observableInt)
emitter.onNext(observableStr)
}
Observable.concat<Any>(observable)
.subscribe({ o -> Log.e("RX", o!!.toString()) })
最终打出的日志按顺序是 1,2,3,4,5,a,b,c
concatDelayError 和 concatArrayDelayError 是推迟发射 onError。
concatEager/concatArrayEager
public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)
public static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources)
// 集合和上面的可变参数可以看成一样的
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch)
注释:
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
source ObservableSources. The operator buffers the values emitted by these ObservableSources and then drains them
in order, each one after the previous one completes.
意思大概是说,一旦有观察者订阅了之后,会先将被观察者发射的数据缓存起来,然后将缓存的数据一个接一个的发射出去。
对于外部调用来说,结果和 concat 作用没什么区别。它是并发处理多个 Observable,而不像 concat 那样串行处理,但是又能够保证最终的顺序。
Observable.concatArrayEager(observableStr, observableInt, observableBoolean).subscribe({
Log.e("RX", it.toString())
})
依次发射三个 Observable 的数据。
有重载方法的参数是 ObservableSource<? extends ObservableSource<? extends T>> sources
应该和 concat 里用这个参数的重载方法差不多,就不写代码测试了。
concatWith
public final Observable<T> concatWith(ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return concat(this, other);
}
非静态,自己和别人 concat。
collect/collectInto
收集发射的数据到一个数据结构里,然后将这个结构作为一个整体发射出去。
Observable.just(18, "China", "Ma")
.collect(Callable<MutableList<Any>> {
arrayListOf()
}, BiConsumer<MutableList<Any>, Any> {
t1, t2 -> t1.add(t2)
}).subscribe(Consumer<MutableList<Any>> {
Log.e("RX", "$it")
})
将三个零散的数据收集到一个列表里,最后收到 [18, China, Ma]
。
collectInto 是将数据结构直接作为第一个参数传进去,而不需要通过回调提供一个数据结构。
Observable.just(18, "China", "Ma")
.collectInto(arrayListOf()
, BiConsumer<MutableList<Any>, Any> {
t1, t2 -> t1.add(t2)
}).subscribe(Consumer<MutableList<Any>> {
Log.e("RX", "$it")
})
count
返回发射的数目,并且将这个数目作为一个 64 位的 long 型值以 Single 发射出来。
Observable.just(20,30,40).count()
.subscribe(object : SingleObserver<Long> {
override fun onSuccess(t: Long) {
textView.text = "${textView.text}\n $t"
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
})
reduce/reduceWith
// 返回 Maybe
public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
// 有初始值,返回 Single
public final <R> Single<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
// 通过一个回调获取初始值
public final <R> Single<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer)
一种累计运算。
// 算乘积,结果是 6
Observable.just(1, 2, 3)
.reduce({ t1, t2 -> t1 * t2 }).subscribe {
textView.text = "${textView.text}\n $it"
}
// 有初始值 10,结果是 60
Observable.just(1, 2, 3)
.reduce(10, { t1, t2 -> t1 * t2 })
.subscribe(object: SingleObserver<Int> {
override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
})
// 有初始值 8,结果是 48
Observable.just(1, 2, 3)
.reduceWith({ 8 }, { t1, t2 -> t1 * t2 })
.subscribe(object: SingleObserver<Int> {
override fun onSuccess(t: Int) {textView.text = "${textView.text}\n $t"}
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {}
})