RxJava是什么?
一个基于观察者模式(事件流)的异步任务库。可以很简洁地完成一个异步任务,当任务复杂时也能清晰地表达逻辑。GitHub地址。,具体的一些理论可以查看抛物线
这边文章《给 Android 开发者的 RxJava 详解》,很好的入门教程。
基本使用
在RxJava2.0中,把背压和非背压分两种观察者模式。
背压:事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。
1、非背压
/**
* 非背压
* Observable对应Observer
*/
private void createObservable() {
//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This");
e.onNext("is");
e.onNext("RxJava");
e.onComplete();
}
});
//观察者
Observer<String> observer = new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
//取消订阅
if (!disposable.isDisposed()) {
disposable.dispose();
}
}
};
observable.subscribe(observer);
}
2、背压
/**
* 背压(在异步过程中,由于被观察者发射数据过快,而观察者处理数据不及时,
* 导致内存里堆积了太多数据,从而OOM,可以选择不同的策略处理该问题)
* Flowable对应subscriber
*/
private void createFlowable() {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
if (!e.isCancelled()) {
e.onNext("This");
e.onNext("is");
e.onNext("RxJava");
e.onComplete();
}
}
//抛弃策略
}, BackpressureStrategy.DROP);
Subscriber<String> subscriber = new Subscriber<String>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
//请求一个数据
subscription.request(1);
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
//处理完后,再请求一个数据
subscription.request(1);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: " + e.getLocalizedMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
//取消订阅
subscription.cancel();
}
};
flowable.subscribe(subscriber);
}