模拟多张图片并行上传业务,所有的都成功后,才允许点击保存
--------单线程-----
12-25 15:59:22.737 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
12-25 15:59:23.164 932 950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +1s985ms
12-25 15:59:24.738 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://b
12-25 15:59:24.738 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://a
12-25 15:59:26.738 24988 25035 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://c
12-25 15:59:26.738 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://b
12-25 15:59:28.739 24988 24988 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://c
Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String token) throws Exception {
return Observable.fromIterable(stringList).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return token + s;
}
}).observeOn(Schedulers.io()).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
Thread.sleep(2000);
return "request done:" + s;
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + s);
}
});
-----支持流并行执行-----
12-25 16:02:05.474 25972 26018 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
12-25 16:02:05.477 25972 26019 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-3 tokenfile://b
12-25 16:02:05.485 25972 26020 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-4 tokenfile://c
12-25 16:02:05.893 932 950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +1s972ms
12-25 16:02:07.475 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://a
12-25 16:02:07.478 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://b
12-25 16:02:07.490 25972 25972 D SpringItemViewActivity: testRxjavaToList subscribe main request done:tokenfile://c
//支持多线程并行,并行的最小单元是一条流。如何识别存在多条流的唯一方式是看是否存在flatMap(),并且FlatMap前有多少条流,就是多少能产生多少天新流,每个流
//通过设置observeOn(Schedulers.io()可以并发执行
Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String token) throws Exception {
return Observable.fromIterable(stringList).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return token + s;
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s).observeOn(Schedulers.io()).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
Thread.sleep(2000);
return "request done:" + s;
}
});
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + s);
}
});
-----------代码 支持线程并发、支持每个流更新UI、支持最后合并toList给UI一个最终结果、支持流过程异常降级处理-------
12-25 15:56:55.965 932 1661 I ActivityManager: Start proc 23904:com.pitaya.findviewbyiddemo/u0a71 for activity com.pitaya.findviewbyiddemo/.SpringItemViewActivity
12-25 15:56:57.519 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
12-25 15:56:57.521 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
12-25 15:56:57.522 23904 23951 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-2 tokenfile://a
12-25 15:56:57.523 23904 23949 D SpringItemViewActivity: testRxjavaToList merge token RxCachedThreadScheduler-1
12-25 15:56:57.526 23904 23952 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-3 tokenfile://b
12-25 15:56:57.526 23904 23953 D SpringItemViewActivity: testRxjavaToList request RxCachedThreadScheduler-4 tokenfile://c
12-25 15:56:57.956 932 950 I ActivityManager: Displayed com.pitaya.findviewbyiddemo/.SpringItemViewActivity: +2s27ms
12-25 15:56:59.523 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main request done:tokenfile://a
12-25 15:56:59.530 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main request done:tokenfile://c
12-25 15:56:59.536 23904 23904 D SpringItemViewActivity: testRxjavaToList apply map main null
12-25 15:56:59.548 23904 23904 D SpringItemViewActivity: testRxjavaToList subscribe main [request done:tokenfile://a, request done:tokenfile://c, null ]
//线程切换、异常处理onErrorReturnItem 的最小执行单位都是,而不是流内部含有for循环的操作符,比如Just、From等。
ArrayList<String> stringList = new ArrayList<>();
stringList.add("file://a");
stringList.add("file://b");
stringList.add("file://c");
Observable.just("token").flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String token) throws Exception {
return Observable.fromIterable(stringList).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "testRxjavaToList merge token " + getCurrentName());
return token + s;
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s).observeOn(Schedulers.io()).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "testRxjavaToList request " + getCurrentName() + " " + s);
Thread.sleep(2000);
if (s.contains("file://b")) {
throw new NullPointerException("服务器异常"); //服务器异常
}
return "request done:" + s;
}
}).onErrorReturnItem(" null "); //当前流出错后,降级处理
}
}).observeOn(AndroidSchedulers.mainThread()).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "testRxjavaToList apply map " + getCurrentName() + " " + s);
Toast.makeText(getApplicationContext(), s, LENGTH_SHORT).show();//更新图标状态
return s;
}
}).observeOn(Schedulers.io());
}
}).toList().toObservable().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
Log.d(TAG, "testRxjavaToList subscribe " + getCurrentName() + " " + strings);
}
});