flatMap实现并发
Observable.fromIterable(list)
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
//利用flatMap实现了并发,将一个Observable拆分成了list.size()个
return Observable.just(integer).subscribeOn(Schedulers.io());
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
}
});
代码很简单,但是存在一个问题:
如何精确控制线程并发数?
方法一:利用groupBy将数据分组
Observable.fromIterable(list)
.groupBy(new Function<Integer, Boolean>() {
int i = 0;
@Override
public Boolean apply(Integer integer) throws Exception {
i++;
return (i<5);
}
})
.flatMap(new Function<GroupedObservable<Boolean, Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(GroupedObservable<Boolean, Integer> objectIntegerGroupedObservable) throws Exception {
//利用flatMap实现了并发
return
objectIntegerGroupedObservable.subscribeOn(Schedulers.io());
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
}
});
方法二:自定义线程池
Observable.fromIterable(list)
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
//利用flatMap实现了并发
return Observable.just(integer).subscribeOn(scheduler);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
}
});
flatMap实现并发点,不能保证下游观察者收到的数据序列与源数据序列一致,若要保证顺序,可以使用concatMap或concatMapEager
Flowable.parallel实现并发
Flowable.fromIterable(list)
.parallel(5)
.runOn(Schedulers.io())
.sequential()//把ParallelFlowable转回Flowable
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("线程",Thread.currentThread().getName());
}
});
parallel不传参数,则默认创建Runtime.getRuntime().availableProcessors()个线程