RxJava 2.0----观察者模式之厨师与顾客

一. 引言
RxJava ,简单来说,一个实现异步操作的库。代替了我以前使用Thread,AsyncTask实现异步操作,ScheduledExecutorService,ExecutorService线程池,定时器Timer,TaskTask等来实现任务调度器,定时器。因为Android异步操作以后常常要切换到主线程,RxAndroid这个库就很好的实现了即使在很复杂的情况下,也可以简单的切换到主线程,还可以使你的代码逻辑简洁清晰 。
而一开始接触的时候,觉得RxJava不好用,主要原因是观察者模式的理解,它为了实现它的链式操作,牺牲了它的可读性,导致刚开始学的时候快要被各种它的命名,各种观察者,被观察者绕晕了。

二. RxJava 的观察者模式概要

RxJava的观察者模式跟传统理解上的观察者模式不同,例如OnClickListener和View。View绑定这个事件后,相应的动作发生,就会立刻被View捕获到,观察者已有变化就会被观察者捕获到。即观察者使用注册register(),绑定bind(),订阅subscribe()的方式,告诉被观察者,当你有变化的时候通知我。
RxJava的观察者模式可以说是一种扩展版的观察者模式,被观察者Observable 决定事件(数据)的产生以及如何产生,即决定事件源。观察者Observer 决定事件源的特定行为。事件何时发生则由订阅事件subscribe 来决定。
我们可以将被观察者Observable当做一个厨师,Observer当做顾客,当顾客进入餐厅,点单以后,订阅事件发生。 Observable厨师则立刻搭配这道菜的食材并且完成这道菜,当厨师将做好的菜给顾客的时候,这道菜如何吃,在哪里吃则都由顾客决定。

三. RxJava 的观察者情景演示

1. 情景一
顾客下单---西红柿鸡蛋炒面

 Observable.just("noodles","egg","vegetable")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull final String s) throws Exception {
                        //to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..

                        Log.d("duanyl", "Observable 厨师: to do some prepare "+s );
                      
                        return s;
                    }
                })//using reduce to add all the String
                .reduce(new BiFunction<String, String, String>() {
                    @Override
                    public String apply(@NonNull final String s, @NonNull final String s2) throws Exception {
                        //cook

                        Log.d("duanyl", "Observable 厨师: to cook" +s+"," +s2);
                   
                        return s+"," +s2;
                    }
               })
                .flatMapObservable(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {

                        return Observable.just(s+" +盘子---鸡蛋西红柿炒面)");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        //上菜
                        Log.d("duanyl", " 上菜" );
                    }
                })
                .subscribeOn(Schedulers.io())//在厨房
                .observeOn(AndroidSchedulers.mainThread())//在餐厅
                 .subscribe(new Observer<String>() {
                     @Override
                     public void onSubscribe(@NonNull Disposable d) {
                         Log.d("duanyl", " 下单" );
                        
                     }

                     @Override
                     public void onNext(@NonNull final String s) {
                         Log.d("duanyl", "Observer 顾客 onSuccess:  eat "+s );
                        
                     }

                     @Override
                     public void onError(@NonNull final Throwable e) {
                         Log.d("duanyl", "Observer 顾客 onError: 知道"+e.getMessage() );
                       
                     }

                     @Override
                     public void onComplete() {
                         Log.d("duanyl", "Observer 顾客 onComplete: 吃完付钱" );
                       
                     }
                 });
 
 

输出结果:

情景一

2. 情景二
顾客下单---薯条,鸡排,西瓜汁---但是中途厨师烫伤,顾客知道后,退款离开

       private void doAct2() {
        tx_console.setText("doAct2 Current Thread:"+Thread.currentThread().getName());
        Observable.just("土豆","鸡肉","西瓜")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull   String s) throws Exception {
                        //to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..

                        if(s.equals("土豆")){
                            s = "cook土豆-----薯条";
                        }else if(s.equals("鸡肉")){
                            s = "cook鸡肉-----炸鸡排";
                        }else{
                            s = "cook西瓜-----西瓜汁";
                        }

                        final String msg = "Observable 厨师:"+s;
                        Log.d("duanyl", msg);
                      
                        return s;
                    }
                })
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull final  String s) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                                e.onNext(s.substring(s.indexOf("-----")));
                                if(s.contains("鸡肉")){
                                    e.onError(new Throwable("厨师烫伤"));
                                }
                                e.onComplete();
                            }
                        });
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.d("duanyl", " 接单 收款" );
                       
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        //上菜
                        Log.d("duanyl", " 退款" );
                       
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("duanyl", " 上菜" );
                       
                    }
                })
                .subscribeOn(Schedulers.io())//在厨房
                .observeOn(AndroidSchedulers.mainThread())//在餐厅
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull final String s) throws Exception {
                        Log.d("duanyl", "Observer 顾客 onSuccess:  eat " + s);
                     
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull final Throwable throwable) throws Exception {
                        Log.d("duanyl", "Observer 顾客 onError: 知道" + throwable.getMessage());
                        
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("duanyl", "Observer 顾客 onError: 吃完" );
                       
                    }
                });
    }

输出结果:

情景二

3. 情景三
顾客下单---薯条,鸡排,西瓜汁---但是中途顾客不满意,没有退款就离开,但是厨师不知道依然做完三道菜

   tx_console.setText("doAct3 Current Thread:"+Thread.currentThread().getName());
        Observable.just("土豆","鸡肉","西瓜")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull   String s) throws Exception {
                        //to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..

                        if(s.equals("土豆")){
                            s = "cook土豆-----薯条";
                        }else if(s.equals("鸡肉")){
                            s = "cook鸡肉-----炸鸡排";
                        }else{
                            s = "cook西瓜-----西瓜汁";
                        }
                        final String msg = "Observable 厨师:"+s;
                        Log.d("duanyl", msg);
                      
                        return s;
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.d("duanyl", " 接单 收款" );
                    
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        //上菜
                        Log.d("duanyl", " 退款" );
                      
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("duanyl", " 上菜" );
                    
                    }
                })
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull final  String s) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                                e.onNext(s.substring(s.indexOf("-----")));
                                e.onComplete();
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())//在厨房
                .observeOn(AndroidSchedulers.mainThread())//在餐厅
                .subscribe(new Observer<String>() {
                    Disposable disposable;
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(@NonNull final String s) {
                        Log.d("duanyl", "Observer 顾客 onSuccess:  eat " + s);
                      
                        Log.d("duanyl", "Observer 顾客 离开");
                       
                        disposable.dispose();
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d("duanyl", "Observer 顾客 离开");
                       
                    }
                });

输出结果:

情景三

4. 情景四
顾客下单---"薯条","炸鸡排","西瓜汁","紫米粥"----并且顾客要求吃完一道在上一道菜

   Flowable.just("土豆","鸡肉","紫米","西瓜")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull   String s) throws Exception {
                        //to do some prepare,such as:打鸡蛋 wipe egg,洗青菜 wash vegetable,..

                        if(s.equals("土豆")){
                            s = "cook土豆-----薯条";
                        }else if(s.equals("鸡肉")){
                            s = "cook鸡肉-----炸鸡排";
                        }else if(s.equals("西瓜")){
                            s = "cook西瓜-----西瓜汁";
                        }else{
                            s = "cook紫米-----紫米粥";
                        }

                        final String msg = "Observable 厨师:"+s;
                        Log.d("duanyl", msg);
                         
                        return s;
                    }
                })

                .doOnSubscribe(new Consumer<Subscription>() {
                    @Override
                    public void accept(@NonNull Subscription subscription) throws Exception {
                        Log.d("duanyl", " 接单 收款" );
                       
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        //上菜
                        Log.d("duanyl", " 退款" );
                        
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d("duanyl", " 上菜" );
                      
                    }
                })
                .onBackpressureBuffer()
                .subscribeOn(Schedulers.io())//在厨房
                .observeOn(AndroidSchedulers.mainThread())//在餐厅
                .subscribe(new Subscriber<String>() {
                    Subscription subscription;
                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                        subscription.request(1);
                    }

                    @Override
                    public void onNext(final String s) {
                        Log.d("duanyl", "Observer 顾客 onSuccess:  eat " + s);
                     
                        new Handler().postDelayed(new Runnable() {
                            @Override
                            public void run() {
                                Log.d("duanyl", "Observer 顾客 onSuccess:  吃完了 " + s);
                              
                                subscription.request(1);
                            }
                        },5000);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d("duanyl", "Observer 顾客 onError: 全部吃完" );
                        tx_console.post(new Runnable() {
                            @Override
                            public void run() {
                                tx_console.append("\nObserver 顾客 onError: 全部吃完"  );
                            }
                        });
                    }
                });

输出结果:

情景四
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容