RxJava的使用之简单用法

之前在使用Retrofit进行网络请求时,里面用到了RxJava,但是对其却不甚理解,只是照搬网上所查,对其进行简单的使用。现在在空余时间进行重新学习,并记录下来,期望能帮助后学者。
RxJava是什么呢?它是一个基于事件流,实现异步操作的库,顾名思义我们用它来进行异步操作。它被推崇的原因在与简洁,即使在程序逻辑变的越来越复杂的情况下,依然能够保持简洁。RxJava的异步实现是通过观察者模式实现的。
RxJava 有四个基本概念:Observable(可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe()方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知Observer。下面我们来进行简单的使用。
添加依赖

    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.2.0'

代码所在网址:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid

首先,创建被观察者 Observable,它决定什么时候触发事件以及触发怎样的事件。
创建:create(OnSubscribe)

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello RxJava!");
                subscriber.onCompleted();
            }
        });

创建:just(T...),将传入的参数依次发送出来

Observable<String> Observable1 = Observable.just("Hello RxJava!");

创建:from(T[]) / from(Iterable<? extends T>),将传入的数组或者集合拆分成具体对象后,依次发送出来

        String[] strs = new String[]{"Hello", "RxJava", "!"};
        Observable<String> Observable2 = Observable.from(strs);
        List<String> strList = Arrays.asList(strs);
        Observable<String> Observable3 = Observable.from(strList);
注意:上面创建Observable的三种方法是等价的。

其次,创建观察者 Observer,它决定触发的事件将会有什么行为。
创建:Observer

Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {//事件队列完结,RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
                Log.e("LHC", "RxJava-->onCompleted");
            }

            @Override
            public void onError(Throwable e) {//事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
                e.printStackTrace();
                Log.e("LHC", "RxJava-->onError");
            }

            @Override
            public void onNext(String s) {//发送事件
                Log.e("LHC", "RxJava-->onNext:" + s);
                tvRxJava.setText(s);
            }
        };
注意:在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

创建:Subscriber,Subscriber是实现了Observer的抽象类,对Observer进行了一点扩展

        /**
         * 创建观察者二
         * 除了使用Observer来创建观察者外,还可以使用Subscriber,Subscriber是实现了Observer的抽象类,对Observer进行了一点扩展
         * 但是他们的使用方法是一样的。额外增加了onStart()和unsubscribe()方法。
         * unsubscribe():取消订阅
         * onStart():它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
         */
        subscriber = new Subscriber<String>() {
            @Override
            public void onStart() {
                super.onStart();
                //这是Subscriber增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作
            }

            @Override
            public void onCompleted() {
                Log.e("LHC", "RxJava-->onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                Log.e("LHC", "RxJava-->onError");
            }

            @Override
            public void onNext(String s) {
                Log.e("LHC", "RxJava-->onNext:" + s);
                tvRxJava.setText(s);
            }
        };

最后,订阅subscribe

        /**
         * observable.subscribe(observer) 这种关联也是将观察者observer转换成Subscription,然后下面的处理和调用
         * observable.subscribe(subscriber) 的处理是一样的
         */
        observable.subscribe(observer);//订阅,将观察者和被观察者关联
        observable.subscribe(subscriber);//订阅,将观察者和被观察者关联

除了上面两种订阅方式外,subscribe 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber,代码如下:

        Observable<String> observable = Observable.just("Hello", ", RxJava!!");

        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("LHC", "Action1-->onNextAction:"+ s);
            }
        };

        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable e) {
                Log.e("LHC", "Action1-->onErrorAction");
                e.printStackTrace();
            }
        };

        Action0 onCompleted = new Action0() {
            @Override
            public void call() {
                Log.e("LHC", "Action1-->onCompleted");
            }
        };

        observable.subscribe(onNextAction);// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
        observable.subscribe(onNextAction, onErrorAction);// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
        observable.subscribe(onNextAction, onErrorAction, onCompleted);// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
    
注意:
1.Action0,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。
2.Action1, 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。

这样RxJava的简单使用流程就完成了。上面为了介绍我们是分步骤实现的,下面我们使用链式调用来实现:

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello RxJava!");
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.e("LHC", "RxJava-->onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                Log.e("LHC", "RxJava-->onError");
            }

            @Override
            public void onNext(String s) {
                Log.e("LHC", "RxJava-->onNext:" + s);
                tvRxJava.setText(s);
            }
        });

当然也可以写成:

Observable.just("Hello", ", RxJava!!").subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("LHC", "Action1:"+ s);
            }
        });

这样代码足够简单了吧...

RxJava的使用之变换
RxJava的使用之Scheduler

参考:http://gank.io/post/560e15be2dca930e00da1083
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容