RxJava--入门

一、ReactiveX简介

在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式

ReactiveX的官网地址为:

http://reactivex.io/

ReactiveX官网对于自身的介绍是:

An APIforasynchronous programmingwithobservable streams

实质上我们可以对其解读为三部分:

ReactiveX的解读

①An API: 首先它是个编程接口,不同语言提供不同实现。例如Java中的RxJava。

②For asynchronous programming: 在异步编程设计中使用。例如开启子线程处理耗时的网络请求。

③With observable streams: 基于可观察的事件流。例如观察者模式中观察者对被观察者的监听。

而ReactiveX结合了如下三部分内容:

观察者模式,即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。

Iterator模式,即迭代流式编程模式。

函数式编程模式,即提供一系列函数样式的方法供快速开发。

Reactive的模式图如下:

图1.1 ReactiveX的模式图

二、RxJava的使用

1、RxJava的优势

在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。

但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁

2、RxJava的配置

首先,在Android Studio中配置Module的build.gradle,在这里我们使用的版本是1.2版本,并且导入RxAndroid,辅助RxJava完成线程调度:

implementation"io.reactivex:rxjava:1.2.0"implementation"io.reactivex:rxandroid:1.2.0"

然后,RxJava基于观察者设计模式,其中的关键性三个步骤如下:

(1)Observable被观察者

Observable被观察者创建的代码如下:

Observable observable = Observable.create(newObservable.OnSubscribe() 

{@Overridepublicvoidcall(Subscriber subscriber){          

      subscriber.onNext("bean");               

 subscriber.onCompleted();      

      }      

  });

在这里,要强调的是Observable被观察者是类类型,其中有诸多方法,我们关注其构造函数与创建Observable对象的方法,查看如下图对应的视图结构:

图2.2.1 Observable被观察者对象的视图结构

查看源码:

protectedObservable(OnSubscribe f){this.onSubscribe = f;        }

publicinterfaceOnSubscribeextendsAction1>{                }

publicstaticObservablecreate(OnSubscribe f){returnnewObservable(RxJavaHooks.onCreate(f));      

  }

publicstaticObservablecreate(SyncOnSubscribe syncOnSubscribe){returncreate((OnSubscribe)syncOnSubscribe);        }

publicstaticObservablecreate(AsyncOnSubscribe asyncOnSubscribe){returncreate((OnSubscribe)asyncOnSubscribe);        }

通过源码分析,可知Observable提供了create()方法来获取Observable实例对象。

此外,除了基本的创建的方法,Observable还提供了便捷的创建Observable序列的两种方式,代码如下:

第一种,会将参数逐个发送

Observable observable1 = Observable.just("xxxx","xxxx");

第二种,会将数组元素逐个转换完毕后逐个发送

String[] observableArr = {"xxxx","xxxxx"};        

Observable observable2 = Observable.from(observableArr);

其中Observable.just()方法会调用from()方法,详情可查看源码。

(2)Observer观察者

Observer观察者创建的代码如下:

Observer observer =newObserver() {

@OverridepublicvoidonCompleted(){         

       Log.e(TAG,"onCompleted");        

    }

@OverridepublicvoidonError(Throwable e){         

       Log.e(TAG,"onError,Error Info is:"+ e.getMessage());      

      }

@OverridepublicvoidonNext(String s){             

   Log.e(TAG, s);    

        }    

    };

Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下图所示Observer的视图结构:

图2.2.2 Observer观察者对象的视图结构

那么在RxJava中,Observer有其接口实现类对象Subscriber,它们的使用onNext、onCompleted、onError方法是一样的,但是Subscriber对于Observer接口进行了拓展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,代码如下:

Subscriber subscriber =newSubscriber() {@OverridepublicvoidonStart(){        

        Log.e(TAG,"onStart");            }

@OverridepublicvoidonCompleted(){   

             Log.e(TAG,"onCompleted");            }

@OverridepublicvoidonError(Throwable e){       

         Log.e(TAG,"onError,Error Info is:"+ e.getMessage());            }

@OverridepublicvoidonNext(String s){              

  Log.e(TAG, s);            }    

    };

其中,onStart()方法会在事件未发送前被调用,可以用于订阅关系建立前的准备工作,例如将数据清空或者重置,在Subscriber中默认是空实现,我们可以在该方法中调用自己的业务逻辑代码。在如下的视图结构中我们可以看到Subscriber的拓展内容,重点是add()、unsubscribe()方法以及名为subscription的Subscription队列

图2.2.3 Subscriber对象视图结构

(3)Subscribe订阅关系

Observable与observer形成订阅关系代码如下:

observable.subscribe(observer);//或者observable.subscribe(subscriber);

那么我们以observable.subscribe(observer)为例在这里继续查看源码,查看subscribe()方法到底做了什么:

图2.3.1 Observable调用Subscribe将Observer转换为Subscriber对象

Observer转换为Subscriber对象在这里得到印证。

在之后的内容中统一以Subscriber作为订阅观察者对象

继续深入,我们可以看到订阅关系中的关键步骤(仅核心代码):

subscriber.onStart();RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);returnRxJavaHooks.onObservableReturn(subscriber);

在这里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等价于OnSubscribe.call(subscriber),见下图2.3.2:

图2.3.2 RxJavaHooks.onObservableStart()转换为OnSubscribe

在return RxJavaHooks.onObservableReturn(subscriber)这里等价于return subscription,见下图2.3.3:

图2.3.3 RxJavaHooks.onObservableReturn()转换为Subscrition

可以看到,subscriber() 做了3件事:

调用 Subscriber.onStart() 。该方法用于在订阅关系建立之前的准备。

调用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的内部接口,而事件发送的逻辑在这里开始运行。从这也可以看出,在 RxJava 中当 subscribe() 方法执行的时候订阅关系确立,Observable 开始发送事件。

将传入的 Subscriber 作为 Subscription 返回。这是为了方便后续调用unsubscribe()。

三、RxJava的不完整回调

1、不完整回调的代码示例

Observable observable = Observable.just("xxxx","xxxx");    

    Action1 onNextAction =newAction1() {@Overridepublicvoidcall(String s){       

         Log.e(TAG, s);          

  }        };  

      Action1 onErrorAction =newAction1() {

@Overridepublicvoidcall(Throwable throwable){          

      Log.e(TAG,"onError,Error Info is:"+ throwable.getMessage());        

    }    

    };      

  Action0 onCompletedAction =newAction0() {

@Overridepublicvoidcall(){       

         Log.e(TAG,"onCompleted");  

          }       

 }

;// 根据onNextAction 来定义 onNext()observable.subscribe(onNextAction);

// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()observable.subscribe(onNextAction, onErrorAction);

// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()、onCompletedAction 来定义 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

2、不完整回调的原理分析

在这里我们可以看到:

Action0无参数泛型无返回值类型,而Subscriber中的onCompleted()方法也没有参数泛型

Action1有1个参数泛型无返回值类型,onNextAction设置的参数泛型为String,而Subscriber中的onNext()方法参数泛型也是String(和本文中观察者对象中的OnNext方法对比)

Action1有1个参数泛型无返回值类型,onErrorAction设置的参数泛型为Throwable,而Subscriber中的onError()方法参数泛型也是Throwable

那么,我们来查看observable.subscribe(onNextAction)的源码,在这里, Action1可以被当成一个包装对象,将onNext()方法进行包装作为不完整的回调传入到observable.subscribe()中

图3.2.1 传入的onNextAction最终被包装成ActionSubscriber

我们来看看Action1有何玄机,Action1的源码如下图所示:

图3.2.2  Action1接口源码

实质上,这种根据参数泛型的个数且无返回值类型的包装在RxJava中有多种如下图所示的体现,例如Action0的参数个数为0,Action1的参数个数为1以此类推:

图3.2.3 根据参数泛型的个数且无返回值类型的包装

四、RxJava的线程切换

1、Scheduler线程调度器

如果不指定线程,默认是在调用subscribe方法的线程上进行回调,那么如果子线程中调用subscibe方法,而想在主线程中进行UI更新,则会抛出异常。当然了RxJava已经帮我们考虑到了这一点,所以提供了Scheduler线程调度器帮助我们进行线程之间的切换。

实质上,Scheduler线程调度器和RxJava的操作符有紧密关联,我将在下一篇文章中进行详细介绍。

RxJava内置了如下所示几个的线程调度器:

Schedulers.immediate():在当前线程中执行

Schedulers.newThread():启动新线程,在新线程中进行操作

Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

Schedulers.trampoline():会将任务按添加顺序依次放入当前线程中等待执行。线程一次只执行一个任务,其余任务排队等待,一个任务都执行完成后再开始下一个任务的执行。

此外RxJava还提供了用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form() 。

RxAndroid并且其为我们提供了AndroidSchedulers.mainThread()进行主线程的回调

2、线程控制

调用Observable对象的subscribeOn()、observeOn()方法即可完成线程控制。

subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

Observable.just("xxxx","xxxx")     

     .subscribeOn(Schedulers.io())

//指定 subscribe() 所发生的线程.unsubscribeOn(Schedulers.io()

)//事件发送完毕后,及时取消发送.observeOn(AndroidSchedulers.mainThread())

//指定 Subscriber 所运行在的线程.subscribe(newAction1() {@Overridepublicvoidcall(String s){  

                  Log.e(TAG, s);      

          }     

       });

五、总结

本文主要介绍了RxJava的由来、使用步骤、部分内容的原理解析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 在正文开始之前的最后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:https://g...
    Yagami3zZ阅读 430评论 0 0
  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 677评论 0 2
  • 一、ReactiveX简介 在学习RxJava前首先需要了解ReactiveX,因为RxJava是Reactive...
    测天测地测空气阅读 325评论 0 1
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,460评论 7 62
  • 手机刷新闻的时候,偶然刷出一则9月6号的一则新闻,一名研究生跳楼了,生前说有抑郁倾向。 上个月的一天,急诊抢救室门...
    真爱521阅读 782评论 8 6