史上超详细的RxJava2使用和RxLifecycle生命周期管理

这篇文章基于RxJava2.0

RxJava是什么?
官网说RxJava是一个可观测的序列来组成异步的额,基于事件的库,简单来说,它就是一个实现异步的库,可以代替Android的API如AsyncTask ,Handler等等。

RxJava为什么好?
RxJava其实就是提供一套异步编程的API,这套API是基于观察者模式的,而且是链式调用的,所以使用RxJava编写的代码逻辑会非常简洁。

观察者模式:
定义:对象间一种一对多的关系,使得每当一个对象改变,则所有依赖于它的对象都会得到通知并被自动更新。
作用:解耦,UI层与具体的业务逻辑解耦。

官方支持时间?
官方支持更新到2020年12月31日,没关系的,还有Rxjava3。

使用场景?
可以进行数据库的写入,大图片的载入,文件压缩和解压等各种需要放在后台工作的耗时操作,都可以使用RxJava来实现,可以使用RxJava来实现响应式编程。

如何使用?

添加依赖:

//RxJava2
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

通过RxJava最简单的一个例子引出它的三个基本元素:观察者,被观察者和订阅。


三个基本元素

被观察者

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");//发送事件时,观察者会回调onNext方法
                emitter.onComplete();//这个顺序不能颠倒,如果onNext方法放在最后,onNext就不会执行了
            }
}).subscribeOn(Schedulers.io())//实际项目中网络请求在io线程
  .observeOn(AndroidSchedulers.mainThread());//这里意为观察者在主线程更新UI

create是RXJava中最基本的操作符,

观察者

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {//这个方法在订阅前就会被调用
        Log.i("log", "onSubscribe->" + d);
        //d.dispose();//取消发射事件
    }

    @Override
    public void onNext(String s) {
         Log.i("log", "onNext->" + s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};

订阅观察者:

observable.subscribe(observer);

被观察者订阅观察者后,observable中的方法会被立刻回调 。

除了Observable,还有其它4个被观察者可以操作,一共5种


5种被观察者

接下来看看其它4种被观察者是如何使用的

被观察者(背压)Flowable

Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
     @Override
     public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
         for (int i = 0; i < 1000000000; i++) {
              emitter.onNext(i);
              //这里还是发射了20条数据
              Log.d("log", "subscribe: " + i);
         }
        emitter.onComplete();
     }
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());         

观察者(背压)

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {
         Log.d("log", "onSubscribe");
         //这里体现的是响应式拉取
         //s.request(Long.MAX_VALUE);//指定下游(观察者)接收数据的最大值
         s.request(10);
    }

    @Override
    public void onNext(Integer integer) {
         //拉取10条数据
         Log.d("log", "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
       Log.d("log", "onComplete");
    }
};

被观察者(Single)

Single.create(new SingleOnSubscribe<String>() {
     @Override
     public void subscribe(SingleEmitter<String> emitter) throws Exception {
           emitter.onSuccess("消息");
     }
}).subscribe(new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onSuccess(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }
});

被观察者(Completable)

Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(CompletableEmitter emitter) throws Exception {
        emitter.onComplete();
        emitter.onError(new Exception());
    }
});

被观察者(Maybe)

Maybe.create(new MaybeOnSubscribe<String>() {
     @Override
     public void subscribe(MaybeEmitter<String> emitter) throws Exception {
         emitter.onSuccess("消息");
         emitter.onComplete();
         emitter.onError(new Exception());
     }
});

操作符:操作符包括创建操作符,转换操作符,组合操作符,功能操作符,过滤操作符,条件操作符。

创建操作符

举个just的例子
just和creat一样,也是创建,只是最多不能超过10个参数

   //可以传入多个参数
   //Observable.just("a",1,2,"b").subscribe(new Observer<Object>() {
   //也可以传入一个方法
   Observable.just(getNumber()).subscribe(new Observer<Object>() {
       @Override
       public void onSubscribe(Disposable d) {

       }

       @Override
       public void onNext(Object o) {
            Log.i("log", "" + o.toString());
       }

       @Override
       public void onError(Throwable e) {

       }

       @Override
       public void onComplete() {

       }
   });
public int getNumber() {
    return 1;
}
转换操作符
组合操作符
//组合操作符
Observable.concat(Observable.just(1, 2),
                Observable.just(5, 6),
                Observable.just(3, 4),
                Observable.just(7, 8)).subscribe(new Observer<Integer>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(Integer integer) {
           Log.i("log", "" + integer);
      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
});

输出结果为:1,2,5,6,3,4,7,8

功能操作符
过滤操作符
Observable.just(1, 2)
                .subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer < 2;//发送数字小于2的消息
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i("log", "" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
条件操作符

RxJava的线程切换是如何实现的?
来了解下线程切换的代码实现

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())//网络请求在io线程
                .observeOn(AndroidSchedulers.mainThread());//这里意为观察者在主线程更新UI

RxJava的线程切换是通过Scheduler(线程调度器)来实现的,Scheduler的作用是简化了异步操作。

subscribeOn:通过接收一个Schedule参数,来指定对数据的处理运行在特定的调度器Schedule上,若多次设定,则只有一次起作用。

observeOn:接收一个Schedule参数,来指定下游(RxJava官网把观察者称为下游)操作运行在特定的线程调度器Schedule上,若多次设定,每次均起作用。

Schedule的种类如下:


Schedule的种类

1)Schedules.io()
用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。
2)Schedulers.newThread( )
在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率没有Schedulers.io( )高。
3)Schedulers.computation()
用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。
4)Schedulers.trampoline()
在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。
5)Schedulers.single()
拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
6)Scheduler.from(@NonNull Executor executor)
指定一个线程调度器,由此调度器来控制任务的执行策略。
7)AndroidSchedulers.mainThread()
在Android UI线程中执行任务,为Android开发定制。

RxJava中的背压

背压出现的原因:
当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题

背压策略的解决思路:
利用响应式拉取,响应式拉取是观察者主动去被观察者那里拉取事件,而被观察者则是被动等待通知再发射事件。

BackpressureStrategy背压策略:
1)MISSING
MissingEmitter:在此策略下,通过Create方法创建的Flowable相当于没有指定背压策略,不会对通过onNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符。

2)ERROR
ErrorAsyncEmitter:在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。

3)BUFFER
BufferAsyncEmitter:部维护了一个缓存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默认的异步缓存池满了,会通过此缓存池暂存数据,它与Observable的异步缓存池一样,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM

4)DROP
DropAsyncEmitter:在此策略下,如果Flowable的异步缓存池满了,会丢掉上游发送的数据。

5)LATEST
LatestAsyncEmitter:与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据

RxJava的生命周期

为什么要关注RxJava的生命周期?
来看一个小例子
MainActivity.java

Flowable flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 1000000000; i++) {
            emitter.onNext(i);
            Log.d("log", "subscribe: " + i);
        }
        emitter.onComplete();
    }
}, BackpressureStrategy.BUFFER)
     .subscribeOn(Schedulers.io());

比如我们这里有一个按钮,从MainActivity跳转到MainActivity2中,当跳到MainActivity2的时候,可以通过log看到日志中还是输出打印,这样是很不友好的,更重要的是会造成内存溢出,所以这就是为什么要对RxJava的生命周期进行管理了。

RxLifecycle的github地址:

https://github.com/trello/RxLifecycle

我这里用的是3.0的Rxlifecycle,需要依赖依androidx,Rxlifecycle和androidx的配置如下:

dependencies {
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation 'androidx.appcompat:appcompat:1.0.2'
    implementation 'androidx.constraintlayout:constraintlayout:2.0.0-alpha2'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'androidx.test:runner:1.1.0'
    androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0'

    //RxJava2
    implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

    //3.0这个是基于androidx的
    implementation 'com.trello.rxlifecycle3:rxlifecycle:3.0.0'
    implementation 'com.trello.rxlifecycle3:rxlifecycle-android:3.0.0'
    implementation 'com.trello.rxlifecycle3:rxlifecycle-components:3.0.0'
}

然后让我们的Activity继承RxAppCompatActivity即可。

Observable.interval(1, TimeUnit.SECONDS).doOnDispose(new Action() {
            @Override
            public void run() throws Exception {

            }
        })
                .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE));

这里我设置的是在pause的时候取消订阅,需要结合rxlifecycle使用,当然也可以设置别的状态来管理,比如onPause,onDestroy等。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容