一. 引言
RxJava ,简单来说,一个实现异步操作的库。代替了我以前使用Thread,AsyncTask实现异步操作,ScheduledExecutorService,ExecutorService线程池,定时器Timer,TaskTask等来实现任务调度器,定时器。因为Android异步操作以后常常要切换到主线程,RxAndroid这个库就很好的实现了即使在很复杂的情况下,也可以简单的切换到主线程,还可以使你的代码逻辑简洁清晰 。
而一开始接触的时候,觉得RxJava不好用,主要原因是观察者模式的理解,它为了实现它的链式操作,牺牲了它的可读性,导致刚开始学的时候快要被各种它的命名,各种观察者,被观察者绕晕了。
二. RxJava 的观察者模式概要
RxJava的观察者模式跟传统理解上的观察者模式不同,例如OnClickListener和View。View绑定这个事件后,相应的动作发生,就会立刻被View捕获到,观察者已有变化就会被观察者捕获到。即观察者使用注册register(),绑定bind(),订阅subscribe()的方式,告诉被观察者,当你有变化的时候通知我。
RxJava的观察者模式可以说是一种扩展版的观察者模式,被观察者Observable 决定事件(数据)的产生以及如何产生,即决定事件源。观察者Observer 决定事件源的特定行为。事件何时发生则由订阅事件subscribe 来决定。
我们可以将被观察者Observable当做一个厨师,Observer当做顾客,当顾客进入餐厅,点单以后,订阅事件发生。 Observable厨师则立刻搭配这道菜的食材并且完成这道菜,当厨师将做好的菜给顾客的时候,这道菜如何吃,在哪里吃则都由顾客决定。
三. RxJava 的观察者情景演示
1. 情景一
顾客下单---西红柿鸡蛋炒面
Observable.just("noodles","egg","vegetable")
.map(new Function<String, String>() {
@Override
public String apply(@NonNull final String s) throws Exception {
//to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..
Log.d("duanyl", "Observable 厨师: to do some prepare "+s );
return s;
}
})//using reduce to add all the String
.reduce(new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull final String s, @NonNull final String s2) throws Exception {
//cook
Log.d("duanyl", "Observable 厨师: to cook" +s+"," +s2);
return s+"," +s2;
}
})
.flatMapObservable(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s+" +盘子---鸡蛋西红柿炒面)");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
//上菜
Log.d("duanyl", " 上菜" );
}
})
.subscribeOn(Schedulers.io())//在厨房
.observeOn(AndroidSchedulers.mainThread())//在餐厅
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("duanyl", " 下单" );
}
@Override
public void onNext(@NonNull final String s) {
Log.d("duanyl", "Observer 顾客 onSuccess: eat "+s );
}
@Override
public void onError(@NonNull final Throwable e) {
Log.d("duanyl", "Observer 顾客 onError: 知道"+e.getMessage() );
}
@Override
public void onComplete() {
Log.d("duanyl", "Observer 顾客 onComplete: 吃完付钱" );
}
});
输出结果:
2. 情景二
顾客下单---薯条,鸡排,西瓜汁---但是中途厨师烫伤,顾客知道后,退款离开
private void doAct2() {
tx_console.setText("doAct2 Current Thread:"+Thread.currentThread().getName());
Observable.just("土豆","鸡肉","西瓜")
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
//to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..
if(s.equals("土豆")){
s = "cook土豆-----薯条";
}else if(s.equals("鸡肉")){
s = "cook鸡肉-----炸鸡排";
}else{
s = "cook西瓜-----西瓜汁";
}
final String msg = "Observable 厨师:"+s;
Log.d("duanyl", msg);
return s;
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull final String s) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext(s.substring(s.indexOf("-----")));
if(s.contains("鸡肉")){
e.onError(new Throwable("厨师烫伤"));
}
e.onComplete();
}
});
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.d("duanyl", " 接单 收款" );
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//上菜
Log.d("duanyl", " 退款" );
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d("duanyl", " 上菜" );
}
})
.subscribeOn(Schedulers.io())//在厨房
.observeOn(AndroidSchedulers.mainThread())//在餐厅
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull final String s) throws Exception {
Log.d("duanyl", "Observer 顾客 onSuccess: eat " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull final Throwable throwable) throws Exception {
Log.d("duanyl", "Observer 顾客 onError: 知道" + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d("duanyl", "Observer 顾客 onError: 吃完" );
}
});
}
输出结果:
3. 情景三
顾客下单---薯条,鸡排,西瓜汁---但是中途顾客不满意,没有退款就离开,但是厨师不知道依然做完三道菜
tx_console.setText("doAct3 Current Thread:"+Thread.currentThread().getName());
Observable.just("土豆","鸡肉","西瓜")
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
//to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..
if(s.equals("土豆")){
s = "cook土豆-----薯条";
}else if(s.equals("鸡肉")){
s = "cook鸡肉-----炸鸡排";
}else{
s = "cook西瓜-----西瓜汁";
}
final String msg = "Observable 厨师:"+s;
Log.d("duanyl", msg);
return s;
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
Log.d("duanyl", " 接单 收款" );
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//上菜
Log.d("duanyl", " 退款" );
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d("duanyl", " 上菜" );
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull final String s) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext(s.substring(s.indexOf("-----")));
e.onComplete();
}
});
}
})
.subscribeOn(Schedulers.io())//在厨房
.observeOn(AndroidSchedulers.mainThread())//在餐厅
.subscribe(new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull final String s) {
Log.d("duanyl", "Observer 顾客 onSuccess: eat " + s);
Log.d("duanyl", "Observer 顾客 离开");
disposable.dispose();
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d("duanyl", "Observer 顾客 离开");
}
});
输出结果:
4. 情景四
顾客下单---"薯条","炸鸡排","西瓜汁","紫米粥"----并且顾客要求吃完一道在上一道菜
Flowable.just("土豆","鸡肉","紫米","西瓜")
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
//to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..
if(s.equals("土豆")){
s = "cook土豆-----薯条";
}else if(s.equals("鸡肉")){
s = "cook鸡肉-----炸鸡排";
}else if(s.equals("西瓜")){
s = "cook西瓜-----西瓜汁";
}else{
s = "cook紫米-----紫米粥";
}
final String msg = "Observable 厨师:"+s;
Log.d("duanyl", msg);
return s;
}
})
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(@NonNull Subscription subscription) throws Exception {
Log.d("duanyl", " 接单 收款" );
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//上菜
Log.d("duanyl", " 退款" );
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d("duanyl", " 上菜" );
}
})
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())//在厨房
.observeOn(AndroidSchedulers.mainThread())//在餐厅
.subscribe(new Subscriber<String>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(final String s) {
Log.d("duanyl", "Observer 顾客 onSuccess: eat " + s);
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
Log.d("duanyl", "Observer 顾客 onSuccess: 吃完了 " + s);
subscription.request(1);
}
},5000);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("duanyl", "Observer 顾客 onError: 全部吃完" );
tx_console.post(new Runnable() {
@Override
public void run() {
tx_console.append("\nObserver 顾客 onError: 全部吃完" );
}
});
}
});
输出结果: