上一篇中讲了Rxjava的基本使用,这篇来讲一下Rxjava的一些高级操作
一.Rxjava中的变换
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说 “RxJava 真是太好用了” 的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
首先看一个 map() 的例子:
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 参数类型 String
return getBitmapFromPath(filePath); // 返回类型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 参数类型 Bitmap
showBitmap(bitmap);
}
});
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。
再看一个 flatMap() 的例子:
String[] filePaths = ...;
Subscriber<Bitmap> subscriber = new Subscriber<Bitmap>() {
@Override
public void onNext(Bitmap bitmap) {
...
}
...
};
Observable.from(filePaths)
.flatMap(new Func1<String, Observable<Bitmap>>() {
@Override
public Observable<Bitmap> call(String filePath) {
return Observable.from(getAllPhoto(filePath));
}
})
.subscribe(subscriber);
从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat
二.Rxjava中的Subject
关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:
它可以充当Observable;
它可以充当Observer;
它是Observable和Observer之间的桥梁;
Subject的分类解析
** AsyncSubject**
Observer会接收AsyncSubject的onComplete()
之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。** BehaviorSubject**
Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。** PublishSubject**
PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。** ReplaySubject**
ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。
接下来用代码说明它如何充当Observable,Observer以及Observable和Observer之间的桥梁:
创建Observable并发射数据:
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();```
创建Observer订阅Observable并接收数据:
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});```
借用Subject来连接Observable和Observer:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("as Bridge");
subscriber.onCompleted();
}
}).subscribe(publishSubject);
publishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
...
}
});```