目录
一、Is what 是什么
二、Concept 概念
三、Basic realization 基本实现
四、Scheduler 线程控制(上)
五、Scheduler 线程控制(下)
六、变换
因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。
变换
将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
RxJava核心功能之一:提供了对事件序列进行变换的支持。
分类
变换有多种,即可以针对事件对象,也可以针对整个事件队列。
基类
lift(Operator)
:对事件项和事件序列变换compose(Transformer)
:对Observable
自身变换
拓展类######
-
map()
:事件对象的直接变换,变换中常用的一种。
-
flatMap()
:很有用,但很难理解的变换,以下详解。
相同点:把传入的参数转化后返回另一个对象。
不同点:flatMap()
中返回的是个 Observable
对象,该对象并不是被直接发送到 Subscriber
的回调方法中。
API
//示例1 map()
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
其中,Func1
类是RxJava的一个接口,用于包装含有一个参数的方法。
FuncX 和 ActionX 相似,拥有多个,用于不同参数个数的方法。
区别:FuncX包装的是有返回值的方法
map()
map()
方法将参数中的 String
对象转换成 Bitmap
对象后返回,经过 map()
方法后,事件的参数类型由 String
转为 Bitmap
。
flatMap()
原理
- 使用传入的事件对象创建一个
Observable
对象; - 并不发送这个
Observable
,而是将它激活,于是它开始发送事件; - 每个创建出来的
Observable
发送的事件,都被汇入同一个Observable
,而这个Observable
负责将这些事件同意交给Subscriber
的回调方法。
以上三个步骤,把事件拆成两级,通过一组新建的 Observable
将初始的对象『铺平』之后通过统一路径分发下去,而这个『铺平』就是 flatMap()
所谓的flat
。
示例
现有一个数据结构(学生)
//1. 打印出一组学生的名字
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
//2. 打印出每个学生所需要修的所有课程的名称
//需求区别:每个学生只有一个名字,但却有多个课程
//第一种实现
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
//第二种实现
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
扩展
可以在嵌套的 Observable
中添加异步代码,flatMap()
也常用于嵌套的异步操作。
例:嵌套的网络请求。代码(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 处理显示消息列表
showMessages(messages);
}
});
传统的嵌套请求需要使用嵌套的 Callback
来实现。通过 flatMap()
,把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤。
//按钮的点击监听器
RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释
.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms
.subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
变换的原理: lift()
各种变换虽然功能不同,但实质上都是针对事件序列的处理和再发送。而在RxJava
的内部,它们是基于同一个基础变换方法:lift(Operator)
。
lift()
的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
其中,生成了一个新的Observable
并返回,而且创建新Observale
所用的参数OnSubscribe
的回调方法call()
中的实现和Observable.subscribe()
类似,但是并不一样,关键在于第二行onSubscribe.call(subscriber)
中的onSubscribe
所指代的对象不同:
-
subscribe()
中这句话的onSubscribe
指的是Observable
中的onSubscribe
对象,这个没有问题,但是list()
之后的情况就复杂了。 - 当含有
list()
时:
-
lift()
创建了一个Observable
后,加上之前原始的Obervable
,已经有两个Observable
了; - 同样的,新
Observable
里的新OnSubscribe
加上原始Observable
中的原始OnSubscribe
,也就有了两个OnSubscribe
; - 当用户调用经过
lift()
后的Observable
的subscribe()
的时候,使用的是lift()
所返回的新的Observable
,于是它所触发的onSubscribe.call(subscriber)
,也是用的新Observable
中的新OnSubscribe
,即在lift()
中生成的那个OnSubscribe
; - 而这个新
OnSubscribe
的call()
方法中的onSubscribe
,就是指的原始Observable
中的原始OnSubscribe
,在这个call()
方法里,新OnSubscribe
利用operator.call(subscriber)
生成了一个新的Subscriber
(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新Subscriber
向原始Observable
进行订阅。
这样实现了lift()
过程,像一种代理机制,通过事件拦截和处理实现事件序列的变换。
简而言之:在 Observable
执行了 lift(Operator)
方法之后,会返回一个新的 Observable
,这个新的 Observable
会像一个代理一样,负责接收原始的 Observable
发出的事件,并在处理后发送给 Subscriber
。
示意图:
示例
//具体的 Operator 的实现
//将事件中的 Integer 对象转换成 String
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
注意:讲述
lift()
的原理是为了更好地了解 RxJava ,从而更好地使用它。然而不管是否理解了lift()
的原理,RxJava
都不建议开发者自定义Operator
来直接使用lift()
,而是建议尽量使用已有的lift()
包装方法(如map()
、flatMap()
等)进行组合来实现需求,因为直接使用lift()
非常容易发生一些难以发现的错误。
compose:对Observable整体的变换
//有多个 Observable ,都需要应用一组相同的 lift() 变换
//第一种方法
observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
//第二种方法
private Observable liftAll(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
第二种方法可读性、可维护性提高了。可是被方法包起来,这种方式对于 Observale
的灵活性似乎增添了那么点限制。
//使用 compose() 解决
//第三种方法
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
第三种方法,Observable
可以利用传入的 Transformer
对象的 call
方法直接对自身进行处理,也就不必包在方法里。