目录
一、Is what 是什么
二、Concept 概念
三、Basic realization 基本实现
四、Scheduler 线程控制(上)
五、Scheduler 线程控制(下)
六、变换
因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。
变换
将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
RxJava核心功能之一:提供了对事件序列进行变换的支持。
分类
变换有多种,即可以针对事件对象,也可以针对整个事件队列。
基类
lift(Operator):对事件项和事件序列变换compose(Transformer):对Observable自身变换
拓展类######
-
map():事件对象的直接变换,变换中常用的一种。
map() 示意图 -
flatMap():很有用,但很难理解的变换,以下详解。
相同点:把传入的参数转化后返回另一个对象。
不同点:flatMap() 中返回的是个 Observable 对象,该对象并不是被直接发送到 Subscriber 的回调方法中。
API
//示例1 map()
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
其中,Func1 类是RxJava的一个接口,用于包装含有一个参数的方法。
FuncX 和 ActionX 相似,拥有多个,用于不同参数个数的方法。
区别:FuncX包装的是有返回值的方法
map()
map() 方法将参数中的 String 对象转换成 Bitmap 对象后返回,经过 map() 方法后,事件的参数类型由 String 转为 Bitmap。
flatMap()
原理
- 使用传入的事件对象创建一个
Observable对象; - 并不发送这个
Observable,而是将它激活,于是它开始发送事件; - 每个创建出来的
Observable发送的事件,都被汇入同一个Observable,而这个Observable负责将这些事件同意交给Subscriber的回调方法。
以上三个步骤,把事件拆成两级,通过一组新建的 Observable 将初始的对象『铺平』之后通过统一路径分发下去,而这个『铺平』就是 flatMap() 所谓的flat。

示例
现有一个数据结构(学生)
//1. 打印出一组学生的名字
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
//2. 打印出每个学生所需要修的所有课程的名称
//需求区别:每个学生只有一个名字,但却有多个课程
//第一种实现
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
//第二种实现
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
扩展
可以在嵌套的 Observable 中添加异步代码,flatMap() 也常用于嵌套的异步操作。
例:嵌套的网络请求。代码(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
传统的嵌套请求需要使用嵌套的 Callback 来实现。通过 flatMap() ,把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤。
//按钮的点击监听器
RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释
.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms
.subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
变换的原理: lift()
各种变换虽然功能不同,但实质上都是针对事件序列的处理和再发送。而在RxJava的内部,它们是基于同一个基础变换方法:lift(Operator)。
lift()的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
其中,生成了一个新的Observable并返回,而且创建新Observale所用的参数OnSubscribe的回调方法call()中的实现和Observable.subscribe()类似,但是并不一样,关键在于第二行onSubscribe.call(subscriber)中的onSubscribe所指代的对象不同:
-
subscribe()中这句话的onSubscribe指的是Observable中的onSubscribe对象,这个没有问题,但是list()之后的情况就复杂了。 - 当含有
list()时:
-
lift()创建了一个Observable后,加上之前原始的Obervable,已经有两个Observable了; - 同样的,新
Observable里的新OnSubscribe加上原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe; - 当用户调用经过
lift()后的Observable的subscribe()的时候,使用的是lift()所返回的新的Observable,于是它所触发的onSubscribe.call(subscriber),也是用的新Observable中的新OnSubscribe,即在lift()中生成的那个OnSubscribe; - 而这个新
OnSubscribe的call()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe,在这个call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一个新的Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新Subscriber向原始Observable进行订阅。
这样实现了lift()过程,像一种代理机制,通过事件拦截和处理实现事件序列的变换。
简而言之:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
示意图:



示例
//具体的 Operator 的实现
//将事件中的 Integer 对象转换成 String
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
注意:讲述
lift()的原理是为了更好地了解 RxJava ,从而更好地使用它。然而不管是否理解了lift()的原理,RxJava都不建议开发者自定义Operator来直接使用lift(),而是建议尽量使用已有的lift()包装方法(如map()、flatMap()等)进行组合来实现需求,因为直接使用lift()非常容易发生一些难以发现的错误。
compose:对Observable整体的变换
//有多个 Observable ,都需要应用一组相同的 lift() 变换
//第一种方法
observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
//第二种方法
private Observable liftAll(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
第二种方法可读性、可维护性提高了。可是被方法包起来,这种方式对于 Observale 的灵活性似乎增添了那么点限制。
//使用 compose() 解决
//第三种方法
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
第三种方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必包在方法里。
