RxJava学习系列(一)--使用

注:本系列文章主要用于博主个人学习记录,本文末尾附上了一些较好的文章提供学习。
转载请附 原文链接
RxJava学习系列(一)--使用
RxJava学习系列(二)--原理

RxJava利用响应式编程思想,专注于异步任务的处理,通过操作符进行流式操作,可以极大的去除多层嵌套,达到逻辑的简洁明了。

举个栗子🌰


模拟用户注册请求

RxJava的观察者模式与普通观察者模式有一个区别是分为“冷”启动和“热”启动,“热”启动即不管有没有观察者,观察者会按照自己的逻辑发送数据,而“冷”启动则是只有当观察者开启订阅时才开始发送数据。

1.基本概念及用法

  • 三个重要的对象

    • Observable-数据发送者
    • Subscriber-订阅者
    • OnSubscribe-事件

    一次事件订阅的流程:Observable持有一个 OnSubscribe对象,事件在OnSubscribe对象中被执行,当有Subscriber订阅了这个 Observable时,OnSubscribe中的事件开始执行,并由Observable发射数据给Subscriber

一次完整订阅
  • Subscriber对象

    Subscriber是一个抽象类,需要实现三个方法

    • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext() 发出时,需要触发 onCompleted() 方法作为标志。
    • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
    • 在一个正确运行的事件序列中, onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    引用自扔物线

    在代码中调用了onComplete后再调用onNext,依然可以发送数据,onComplete会在发送完所有的数据后才被调用

    当onError被调用了,即使在出现错误之前调用onNext依然不会成功,只会触发onError

    Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                  //数据发送完毕
                }
    
                @Override
                public void onError(Throwable e) {
                  //数据发送出错
                }
    
                @Override
                public void onNext(String data) {
                  //数据发送成功
                }
            };
    

    通过泛型指定Subscriber所能接收的数据,在onNext 中处理相应的逻辑,此处需要注意的是: onNext方法调用的次数取决于OnSubscribe中被调用的次数

    • Action1<T>

      在某些情况下我们不需要在每一个回调中都处理逻辑,可能只需要订阅onNext,就可以实现Action1<T>

    new Action1<Object>() {
                @Override
                public void call(Object obj) {
                  //有参数的回调onError和onNext
                }
    };
    new Action0(){
          @override
          public void call(){
              //onComplete
      }
    };
    
  • Observable对象

    • Observable.create(OnSubscribe<T> onSubscribe);

    create方法传入一个OnSubscribe对象,在call方法中发送数据,这是最基础的创建方法

    Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("create success");
                }
            });
    
    • Observable.just( T1 t1, T2 t2, T3 t3,…)

    just方法允许快速创建队列,每一个参数会调用onNext方法传递一次(最多10个),且按顺序发送,just在发送完数据后,会调用onComplete

    Observable.just("one","two","three");
    
    • from

    from可以将数组,Iterable,Future对象转换为 Observable对象,发送数据

    String [] array = new String[]{"one-from","two-from","three-from"};
    Observable.from(array);
    

    创建一个Observable的方法有很多,不一一列举


    创建Observable
  • 订阅事件

    • Observable.subcribe()--return Subscription;

    通过subscribe来开启订阅,此时Observable开始发送数据,并且返回一个Subscription对象

    • Subscription

    Subscription是一个接口,有两个方法unsubscribe()和isUnsubscribe(),在订阅事件时返回这个对象,可以在需要的时候取消掉订阅,在android开发中能简单有效的避免内存溢出。

2.线程控制

上面所提到的订阅会默认在当前线程中执行,然并卵,既然是专注于异步操作,就一定有线程控制的方法

  • Schedulers—线程调度


    Sckedulers的各种线程
  • subscribeOn—被观察事件执行线程(事实上,在该方法调用之前,以及调用后,observeOn之前的代码都会在subscribeOn所在的线程中执行)

  • observeOn— 观察线程(可以多次转换,observeOn指定在它之后的代码线程)

  • 实践

    该方法只应该被调用一次,如果调用多次,只有第一个会生效 !


    多次调用subscribeOn

如上图,首先指定了subscribeOn的线程为io线程,然后又指定了计算线程,打印日志


logcat 1

通过日志打印可以发现,只有第一个subscribeOn生效,并且在observeOn之前的代码也都在io线程中执行,而在observeOn之后的代码,在每一次调用该方法后都改变了线程

有好事的同学说了,那如果我先调用observeOn再调用subscribeOn呢?虽然没有人这么做,但严谨的我还是要试试


observeOn

先调用observeOn指定为主线程,然后subscribeOn指定为ui线程


E1EB67D4-11D4-42D2-A26F-5AB4282263A1.jpg

可以看到第一条日志在io线程中执行,而第二条日志在主线程中,似乎可以得到一个结论

observeOn指定在它之后的代码的执行线程,而其余代码均在第一个subscribeOn指定的线程中执行

3.操作符

A.转换操作

  • map()— 将发射的数据转化为subscriber所需要的数据

    Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(random());
                    subscriber.onCompleted();
                }
            }).map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer){
                    if(integer == 0){
                        return "the number is 1";
                    }else{
                        return "the number is not 1";
                    }
                }
            });
    

    栗子中的代码是在原始的Observable中发射类型为Integer的数据,通过map操作后,subscriber所接收到的数据为 String 类型,map中需要传入Fun1<T,E>,参数1表示上一个操作符操作后所发送的数据,一个Observable可以进行多次转化操作,subscriber接收到的数据为最后一次转化发射的数据。

    注意:map操作转换的是发射的数据,Observable本身并不会被转换

  • flatmap()— 将一个Observable转换为一个新的Observable,并且由这个新的Observable发射数据

    Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(random());
                    subscriber.onCompleted();
                }
            }).flatMap(new Func1<Integer, Observable<String>>() {
                @Override
                public Observable<String> call(Integer integer) {
                    return Observable.just("new Observable"+integer);
                }
            });
    

    flatMap 同样需要传入Func1<T,E> 与map不同的是,返回的是一个Observable对象,而subscriber所订阅的应该是这个新的Observable,flatmap也可以多次调用多次转换,问题来了~subscriber只关心接收到的数据,并不关心订阅的具体是哪一个Observable,那flatMap和map的应用场景是什么呢?

  • 应用场景

    • map比较简单,一个Observable可能被多个subscriber订阅,而不同的订阅所需要的最终数据不同,但事件的操作逻辑是相同的,就可以利用map来满足不同的数据需求
    • flatmap的用处就比较多了,文章最开头举的栗子,一次复杂的注册逻辑,首先要请求服务器获取token,获取token后注册请求,注册请求完成后,登录请求,每一次请求利用Retrofit封装返回一个 Observable对象.我们只关心最后登录成功后告知用户,并刷新UI。这样原本用回调至少嵌套两次的逻辑,变得清晰明了(这样的注册逻辑本身是有问题的~)


      模拟一次注册请求

      ![Uploading 48862518-1B43-457E-B734-36AE6669893C_441752.jpg . . .]

注意:只有每一个Observable都成功发射数据后,才会调用onNext方法,如果出现异常会直接调用onError

这样看来,好像很鸡肋,后面会讲到错误操作,你会发现RxJava确实是很牛逼的啊~

  • 其它转换操作


    转换操作符

B.合并操作

  • merge— 将多个Observable合并为一个Observable发射数据

    Observable.merge(observable1,observable2,observable3,observable4);
    

    官方文档说明:merge可能会导致交错发射数据,即不是按照合并顺序来发射数据

    同样,一旦有一个Observable发射异常,会立即触发onError,RxJava的实现中有一个mergeDelayError— 只有当所有的数据都发射完毕后才会调用onError

  • concat— 将多个Observable合并为一个Observable并且按顺序发射数据

    Observable.concat(observable1,observable2,observable3,observable4)
    
  • zip— 将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

    Observable.zip(
                    Observable.just("1"), 
                    Observable.just("2"), 
                    Observable.just("3"), 
                    Observable.just("4"), 
                    new Func4<String, String, String, String, String>() {
                        @Override
                        public String call(String s, String s2, String s3, String s4) {
                            return s+s2+s3+s4;
                        }
                    });
    

    zip传入需要合并的Observable对象,以及 Func4<T,...,Object>,与merge不同的是,zip是将所有发射的数据拿到后,进行整合,最后发射这个整合后的数据。call中的参数是严格按照合并顺序所发射的数据,return的即为最终发射的数据

    zip的不仅可以合并发射源,并且可以根据需要转换最终发射的数据类型

C.过滤操作

假设这样一种场景,加载数据的时候先向服务器请求,如果成功就显示,如果失败就查找缓存数据。很容易想到可以利用合并操作符来处理,但是合并操作会依次发射数据,这不是我们所希望的。这里就需要用的过滤操作了

  • filter — 只发射通过了谓词测试的数据项

    filter根据规则去检验数据,只要通过了检验的数据都会被发射,直到检验完最后一个 Observable

    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer i) {
                          //只发射小于等于5的数据
                          return i<=5;
                        }
                    })
    
  • first — 只发射第一项(或者满足某个条件的第一项)数据

    first类似于filter,不同的是,只发射第一个通过检验的数据

    first()//只发射第一个数据
    first(Func1)//满足某个条件的第一个发射成功的数据
    .first(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            return false;
                        }
                    })
    

    所有发射的数据,会在call中按照规则进行检验,比如当第一个传过来的字符串不为空时,就认为发射数据成功,那么应该return true,当return为true的时候会调用onNext方法,而还没有发射数据的Observable将不再发射数据,如果return为false,那么会依次检验后面的Observable是否发射数据成功,直到return true或者全部不符合调onError

  • last — 只发射最后一条(或者满足某个条件的最后一项)数据

    last的用法跟frist一毛一样。

  • 其它过滤操作


    过滤操作符

D.异常处理

异常处理

onErrorReturn可以在异常发生时发射一个默认的数据,结合过滤操作,可以发射一个不符合规则的数据,避免中断数据发射

E.doOn

有一种场景,比如说请求到数据后写入缓存,但是不希望订阅者去处理,因为如果多处订阅必然会产生重复代码并且可能阻塞主线程,doOn的系列操作就派上了用场

  • doOnNext — 当数据发射成功时调用

    Observable.just("data to shoot")
                    .doOnNext(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            //发射成功后需要的操作
                              writeToCache(s);
                        }
                    })
    

    在使用的时候注意判断doOnNext当前在哪个线程执行

  • doOnError — 当发生异常时调用
  • doOnSubscribe — 当被订阅时调用
  • doOnTerminate — 发射数据完毕后调用

F.封装-compose

当我们真正开始使用RxJava来替换之前的逻辑代码时,我们发现仅用现有的操作符无法做到完全的简洁,依然会出现一些不必要的重复代码和逻辑。适度的封装也是必要的,RxJava早就想到了这点,提供了一个操作符来封装一些通用代码

  • compose(@NotNull Transformer<T,R>)

    compose方法需要传入一个变形金刚对象,其中泛型T为原始的Observable所发射的数据类型,R为变形后所发射的数据类型,举个栗子,比如封装一个方法在io线程中发射数据,在ui线程中观察接收数据。

    public <T> Observable.Transformer<T,T> io_main(){
            return new Observable.Transformer<T, T>() {
                @Override
                public Observable<T> call(Observable<T> tObservable) {
                    return tObservable.subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread());
                }
            };
        }
    

    创建一个Transformer对象,需要实现call方法,return一个新的Observable,然后传入到compose()中:

    Observable.just("123")
                    .compose(this.<String>io_main())
                    .subscribe();
    

    查看一下compose的源码发现,它其实就做了一件事情,调用 Transformer的call方法,并把当前的Observable对象作为参数传进去,返回call方法得到的新的Observable

     public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
            return ((Transformer<T, R>) transformer).call(this);
        }
    

4.RxBus

EventBus让模块间的耦合更低,利用 RxJava实现EventBus简直是容易,且便于管理

在实现RxBus之前介绍两个很重要的类

  • CompositeSubscription— Subscription that represents a group of Subscriptions that are unsubscribed together.

    是Subscription的一个实习类,用于管理一组订阅,当取消订阅时,会将这一组订阅全部取消,在Android中可以利用该类管理一个Activity中所有的异步任务,当Activity被销毁时,取消订阅,避免内存泄漏

    • add — 将一个订阅加入到一个订阅组中
    • remove — 将一个订阅从该组中移除
    • clear — 清空订阅组
    • unsubscribe — 取消改组中正在进行的所有订阅
  • Subject — 一个神奇的类,在 RxJava中它同时充当了Observer和Observable的角色。

    文章一开头提到了“冷”启动和“热”启动,而一个Subject可以将一个“冷”Observer变成一个“热”的,因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。

    subject.subscribe(subscriber) — 订阅事件

    subject.onNext(obj) — 发射数据

    Subject 在RxJava中总共有7个子类,这里不一一介绍(因为我也没用过…)

    • PublishSubject — 一个“热”的Observable,这个对象会在onNext被调用的时候就开始发射数据,无论有没有订阅者,当一个Observer订阅了这个对象时,只会收到订阅时间点之后所发射的数据。官方给出的栗子:

      两个observer分别订阅了同一个subject,observer1会收到所有的数据,而observer2只能收到最后一条数据

    PublishSubject<Object> subject = PublishSubject.create();
      // observer1 will receive all onNext and onCompleted events
      subject.subscribe(observer1);
      subject.onNext("one");
      subject.onNext("two");
      // observer2 will only receive "three" and onCompleted
      subject.subscribe(observer2);
      subject.onNext("three");
      subject.onCompleted();
    

    再想想EventBus的原理,我们所需要的正是这样一个“热”Observable。这里是较为复杂的一种实现,先上原理图


    RxBus原理

再贴代码

private ConcurrentHashMap<Object, List<Subject>> subjectMapper 
  = new ConcurrentHashMap<Object, List<Subject>>();
//订阅事件
public <T> Observable<T> subscribe(@NonNull Object tag) {
        List<Subject> subjectList = subjectMapper.get(tag);
        if (null == subjectList) {
            subjectList = new ArrayList<Subject>();
            subjectMapper.put(tag, subjectList);
        }
        Subject<T, T> subject;
        subjectList.add(subject = PublishSubject.create());
        return subject;
    }
//发布事件--发射数据
public void post(@NonNull Object tag, @NonNull Object content) {
        List<Subject> subjectList = subjectMapper.get(tag);
        if (!isEmpty(subjectList)) {
            for (Subject subject : subjectList) {
                subject.onNext(content);
            }
        }
    }

RxBus的核心逻辑就完成了,当然还需要加上取消订阅,清空事件等代码,比较简单不再赘述,在我实际的项目开发中,我将 RxBus交由 RxJavaManager进行管理,所有的订阅事件全部经过 RxJavaManager来操作,在需要取消订阅的地方统一unsubscribe

5.资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容