classpath 'me.tatarka:gradle-retrolambda:3.2.0'
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
Observable & Observer
- 观察者(Observer)对可观察对象(Observable)发射的数据或者数据序列作出响应。
- 这种模式极大的简化了并发操作,创建一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
}
Observable的具体工作都是在OnSubscribe中完成的
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
void call(Subscriber<? super T> subscriber);
}
Observable调用onNext()
发射数据,方法的参数就是发射的数据,可能会被调用零次或多次;最后会有一次onCompleted()
、onError()
调用(不会同时)
public interface Observer<T> {
void onCompleted() {}
void onError(Throwable e) {}
void onNext(T t) {}
}
创建Observable
-
Observable.create();
Observable.create(new Observable.OnSubscribe<String>() { public void call(Subscriber<? super String> subscriber) { // TODO 做自己的事情 if (subscriber.isUnsubscribed()) return; subscriber.onNext("Hello"); if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } });
-
Observable.just(xxx()); 将传统的java方法转变为Observable
T t = xxx(); Observable.create((subscriber) -> { subscriber.onNext(t); subscriber.onCompleted(); });
Observable.just(T, T); // 等价于Observable.from(T[]);
Observable.from(Iterable); // 从集合中一个接一个的发出每一个对象
-
Observable.empty();
Observable.create((subscriber) -> { subscriber.onCompleted(); });
-
Observable.never();
Observable.create((subscriber) -> { // do nothing });
subscribe()
真正开始发射数据
-
没有传Oberver,仅仅是为了开启Observable,而不用管发出的任何值
observable.subscribe();
-
传入Observer:内部new Subscriber(),将回调委托给传入的Observer
observable.subscribe(observer);
-
传入Action1<T>:内部new Subscriber(),action作为onNext()的回调
observable.subscribe(new Action1<T>() { public void call(T t) {} });
操作符
用于在Observable和最终的Subscriber之间修改Observable发出的数据;通过代理将subscriber层层组合
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
Subscriber<? super T> call(Subscriber<? super R> s);
}
-
filter():过滤不为空,且以org开头的
return Observable.from(getAllApps()) .filter(new Func1<AppInfo, Boolean>() { public Boolean call(AppInfo appInfo) { return appInfo != null && appInfo.getPackName().startsWith("org"); } });
skip() / skipLast():跳过前两个 / 跳过后两个
take() / takeLast():获取前三个 / 获取后三个
repate(3):重复执行
-
distinct():记录每一个发射数据,过滤掉重复的数据项
Observable.just(1, 2, 1, 1, 2, 3).distinct(); // 1, 2, 3
distinctUntilChanged():只判定一个数据和它的直接前驱是否不同
sample():
timeout():
doOnNext():类似切面
doOnCompleted():
-
map():把一个事件转换为另一个事件
Observable.from(getAllApps()) .map(new Func1<AppInfo, AppInfo>() { public AppInfo call(AppInfo appInfo) { String curPackName = appInfo.getPackName(); appInfo.setPackName(curPackName.toUpperCase()); return appInfo; } });
flatMap():接收一个Observable的输出作为输入,同时输出另外一个Observable
Subject
既是Observable可以发出数据,也是Observer接收数据;可以作为桥梁
PublishSubject
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(new Observer<String>() {
public void onCompleted() {}
public void onError(Throwable e) {}
public void onNext(String s) {}
});
subject.onNext("Hello World");
Scheduler
- Schedulers.immediate(); // 默认,立即在当前线程执行指定的工作
- Schedulers.newThread()
- Schedulers.io()
- Schedulers.computation()
- Schedulers.tram
Schedulers.io().createWorker().schedule(() -> {});
不同的操作符对应不同的调度器
observable
.subscribeOn(AndroidSchedulers.mainThread()) // 指定观察者代码运行的线程
.observeOn(Schedulers.computation()) // 指定订阅者运行的线
Android
-
Android的调度器
- AndroidSchedulers.mainThread()
- HandlerScheduler.from(handler)
-
当在Activity中订阅一个Observable的结果时,必须在onDestory里取消订阅
private Subscription subscription; protected void onCreate(Bundle savedInstanceState) { this.subscription = observable.subscribe(this); } protected void onDestory() { super.onDestory(); this.subscription.unsubscribe(); }