变换操作符
-
作用
对事件序列中的事件 / 整个事件序列 进行加工处理(即变换),使得其转变成不同的事件 / 整个事件序列。
-
常见类型
-
应用场景 & 对应操作符介绍
1.Map()
作用
对被观察者发送的每一个事件都通过指定的函数处理,从而变换成另外一种事件。数据类型转换,即将被观察者发送的事件转换为任意的类型事件。
事例
public void MapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
// 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
@Override
public String apply(Integer integer) throws Exception {
return "使用Map变换操作符,将事件"+"的参数从 整型"+integer + " 变换成 字符串类型" + integer*6;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss",s);
}
});
}
2.FlatMap()
作用
将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送。
为事件序列中每个事件都创建一个 Observable 对象,将对每个原始事件转换后的新事件 都放入到对应 Observable对象,将新建的每个Observable 都合并到一个新建的、总的Observable 对象,新建的、总的Observable 对象将新合并的事件序列发送给观察者(Observer)。
注:新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关
事例
public void FlatMapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list=new ArrayList<>();
for (int i = 0; i <3 ; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
// 最终合并,再发送给被观察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss", s);
}
});
}
3.ConcatMap()
作用
类似FlatMap()操作符,区别在于拆分 & 重新合并生成的事件序列的顺序 = 被观察者旧序列生产的顺序。
事例
public void ConcatMapOperators(View view){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list=new ArrayList<>();
for (int i = 0; i <3 ; i++) {
list.add("我是事件 " + integer + "拆分后的子事件" + i);
// 通过concatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
// 最终合并,再发送给被观察者
}
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss", s);
}
});
}
4.Buffer()
作用
定期从被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送,即缓存被观察者发送的事件。
事例
public void BufferOperators(View view){
Observable.just("1,","2","3","3")
.buffer(3,1)// 设置缓存区大小 & 步长
// 缓存区大小 = 每次从被观察者中获取的事件数量
// 步长 = 每次获取新事件的数量
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
Log.e("sss", " 缓存区里的事件数量 = " + strings.size());
for (String value : strings) {
Log.e("sss", " 事件 = " + value);
}
}
@Override
public void onError(Throwable e) {
Log.e("sss", "对Error事件作出响应" );
}
@Override
public void onComplete() {
Log.e("sss", "对Complete事件作出响应");
}
});
}
-
实际开发中的应用
1.网络请求嵌套回调
模拟先请求注册然后登陆
public void RxJavaNest(View view){
Retrofit retrofit=new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRequest_Interface request=retrofit.create(GetRequest_Interface.class);
Observable<Translation> observable1=request.getCall_1();
final Observable<Translation> observable2=request.getCall_2();
observable1.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("sss","第1次网络请求成功");
Log.e("sss",translation.getContent().getOut());
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<Translation, ObservableSource<Translation>>() {
@Override
public ObservableSource<Translation> apply(Translation translation) throws Exception {
return observable2;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Translation>() {
@Override
public void accept(Translation translation) throws Exception {
Log.e("sss", "第2次网络请求成功");
Log.e("sss",translation.getContent().getOut());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("sss", "登录失败");
}
});
}