转载
在RXJava中一种比较nice的思想是能够通过一系列的操作符看到数据是如何转换的:
Observable.from(someSource)
.map(data -> manipulate(data))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> doSomething(data));
假设想要给多个流重复利用一系列操作符该怎么办呢?比如,我想在工作线程中处理数据,在主线程中订阅,所以必须频繁使用subscribeOn() 和observeOn()。如果我能将这逻辑通过连续的、可重复的方式应用到我的所有流上,那简直太棒了。
坏方法
下面这个方法是我用过数月的“反模式”方法。首先,创建一个方法,该方法适用于调度器
<T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
然后调用applySchedulers(),包装你的Observable链:
applySchedulers(
Observable.from(someSource)
.map(data -> manipulate(data))
)
.subscribe(data -> doSomething(data));
虽然这样写能运行但是很丑很乱——因为applySchedulers()导致了它不再是操作符,因此很难在其后面追加其他操作符。现在,当你多用几次这种“反模式”在单一流上之后你就会觉得它有多坏了。
Transformers登场
在RXJava背后有一群聪明的人意识到了这是一个问题并且提供了解决方案:Transformer ,它和 Observable.compose() 一起使用。
Transformer实际上就是Func1<Observable , Observable
,换句话说就是提供给他一个Observable它会返回给你另一个Observable,这和内联一系列操作符有着同等功效。
实际操作下,写个方法,创建一个Transformer调度器:
<T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
用lambda表达式看上去会好看些:
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
现在调用这个方法会是怎么样的呢:
Observable.from(someSource)
.map(data -> manipulate(data))
.compose(applySchedulers())
.subscribe(data -> doSomething(data));
是不是好很多!我们既重用了代码又保护了链式。如果你用的是JDK 7以下版本,你不得不利用compose() 做一些多余的工作。比如说,你得告诉编译器返回的类型,像这样:
Observable.from(someSource)
.map(data -> manipulate(data))
.compose(this.<YourType>applySchedulers())
.subscribe(data -> doSomething(data));
复用Transformers
如果你经常做从一个具体类型转换到另一个类型来创建一个实例:
Transformer<String, String> myTransformer = new Transformer<String, String>() {
// 干一些事情
};
用Transformer会怎么样呢?它压根就不会考虑类型问题,但你不能定义一个通用的实例。
你可以把它改成Transformer<Object, Object>,但是返回的Observable会丢失它的类型信息。
解决这个问题我是从 Collections 中得到了灵感,Collections是一堆类型安全、不可变的空集合的生成方法(比如 Collections.emptyList() )。本质上它采用了非泛型实例,然后通过方法包裹附加泛型信息。
final Transformer schedulersTransformer =
observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
return (Transformer<T, T>) schedulersTransformer;
}
警告:无论如何你都有可能陷入类型转换异常的坑。确保你的Transformer真的是类型无关的。另一方面,你可能会在运行时抛出ClassCastException异常,即使你的代码编译通过了。在这个例子里面,因为调度器没有和发射的项目发生互动,所以它是类型安全的。
和flatMap()有啥区别?
compose()和flatMap()有啥区别呢。他们都是发射出Observable
,是不是就是说他们都可以复用一系列操作符呢?
区别在于compose()是高等级的抽象,他操作的是整个流,而不是单一发射出的项目,这里有更多的解释:
- compose()
是唯一一个能从流中获取原生Observable 的方法,因此,影响整个流的操作符(像subscribeOn()和observeOn())需要使用compose(),相对的,如果你在flatMap()中使用subscribeOn()/observeOn(),它只影响你创建的flatMap()中的Observable,而不是整个流。 - 当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,flatMap()则是在onNext()被调用以后才会执行,换句话说,flatMap()转换的是每个项目,而compose()转换的是整个流。
- flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的。
如果你想用可重用的代码替换一些操作符,可以利用compose()和flatMap(),但他们不是唯一的解决办法。