一、reduce语法
public Observable<Integer> getRxJavaCreateExampleData() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
});
}
public void rxJavaReduceExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer+"---"+integer2);
return integer+integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:11:38.636 12439-12483/com.example.zhang E/MainPresenter: rxJavaReduceExample--:RxCachedThreadScheduler-1-reduce-:1---2
rxJavaReduceExample--:RxCachedThreadScheduler-1-reduce-:3---3
08-19 15:11:38.641 12439-12439/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:6
二、reduce方法2
public void rxJavaReduceExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.reduce(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-reduce-:" + integer + "---" + integer2);
return integer + integer2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaReduceExample--:" + Thread.currentThread().getName() + "-consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-19 15:08:37.078 12228-12282/com.example.zhang E/MainPresenter: rxJavaReduceExample--:RxCachedThreadScheduler-1-reduce-:10---1
rxJavaReduceExample--:RxCachedThreadScheduler-1-reduce-:11---2
rxJavaReduceExample--:RxCachedThreadScheduler-1-reduce-:13---3
08-19 15:08:37.084 12228-12228/com.example.zhang E/MainPresenter: rxJavaReduceExample--:main-consumer-:16
总结
1、reduce(BiFunction<T, T, T> reducer) 把接受到的数据叠加起来,返回一个结果
2、reduce(R seed, BiFunction<R, ? super T, R> reducer) seed初始值