Rxjava2并发实现

flatMap实现并发

 Observable.fromIterable(list)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        //利用flatMap实现了并发,将一个Observable拆分成了list.size()个
                        return Observable.just(integer).subscribeOn(Schedulers.io());
                    }
                })

                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
                    }
                });

代码很简单,但是存在一个问题:
如何精确控制线程并发数?

方法一:利用groupBy将数据分组
Observable.fromIterable(list)
                .groupBy(new Function<Integer, Boolean>() {
                    int i = 0;
                    @Override
                    public Boolean apply(Integer integer) throws Exception {
                        i++;
                        return (i<5);
                    }
                })
                .flatMap(new Function<GroupedObservable<Boolean, Integer>, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(GroupedObservable<Boolean, Integer> objectIntegerGroupedObservable) throws Exception {
                        //利用flatMap实现了并发
                        return
                                objectIntegerGroupedObservable.subscribeOn(Schedulers.io());
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
                    }
                });
方法二:自定义线程池
Observable.fromIterable(list)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(5));
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        //利用flatMap实现了并发
                        return Observable.just(integer).subscribeOn(scheduler);
                    }
                })

                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("线程", Thread.currentThread().getName() + "数字:" + integer);
                    }
                });

flatMap实现并发点,不能保证下游观察者收到的数据序列与源数据序列一致,若要保证顺序,可以使用concatMap或concatMapEager

Flowable.parallel实现并发

 Flowable.fromIterable(list)
                .parallel(5)
                .runOn(Schedulers.io())
                .sequential()//把ParallelFlowable转回Flowable
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("线程",Thread.currentThread().getName());
                    }
                });

parallel不传参数,则默认创建Runtime.getRuntime().availableProcessors()个线程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容