RxJava 3.x系列(三)RxJava操作符使用场景举例

一.Rxjava架构场景

rxjava_mvp

RxJava通过Observable传递数据流,以MVP为例,我们可以在Model层创建Observable并封装基础数据,Presenter层订阅后开始数据流动,P层、M层之间可以采用各种操作符做数据转换,P层Observer收到数据后回调View层显示数据。

一些基于RxJava的基础库通过将数据获取相关逻辑封装在上游Observable中,依赖基础库后即可直接通过订阅相关上游Observable获取数据,这样也利于开发上的分工。

RxJava仅能单向获取数据,图中显示从Model层获取,若View层需要更新Model数据怎么办呢?上图RxXX.set()方法相当于给Model层一个通知做一些操作,并不能传数据。此时更新Model数据得自己实现通知机制也可利用EventBus等开源库。

二、具体使用

1.封装Rx函数库

将上游数据处理封装为静态方法并提供给调用方

RxXXXLib.applyChanges().subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(xxx)
2.注册成功后立即登陆-结合Retrofit & Gson
MyRetrofit.getsInstance().getLoginService().  // 封装Retrofit & 网络请求接口
.register(new RegisterRequest())            // 发起注册请求
.subscribeOn(Schedulers.io())               // 在IO线程发起网络请求
.observeOn(AndroidSchedulers.mainThread())  // 回到主线程处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
    @Override
    public void accept(RegisterResponse registerResponse) {
        // Register success & do something
    }
})
.doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Throwable {
        // Register failed
    }
})
.map(new Function<RegisterResponse, MyResponse>() {
    @Override
    public String apply(RegisterResponse response) throws Throwable {
        // do something
        MyResponse myResponse = XXX
        return myResponse;
    }
})
.observeOn(Schedulers.io())    // 回到IO线程发起登录请求
.flatMap(new Function<MyResponse, ObservableSource<LoginResponse>>() {
    @Override
    public ObservableSource<LoginResponse> apply(MyResponse myResponse) {
        return MyRetrofit.XXX.login(new LoginRequest(myResponse));
    }
})
.observeOn(AndroidSchedulers.mainThread())  // 回到主线程处理请求登录的结果
.subscribe(new Observer<LoginResponse>() {
    @Override
    public void accept(LoginResponse loginResponse) {
        // Login success
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) {
        // Login failed
    }
});
3.从两个后端接口获取数据并组合

组合严格按照两个Observable的发射顺序,且下游收到的事件数量和上游发送事件最少的那个Observable的事件数量相同

Observable<MainResponse> observableMain =                                            
        retrofitApi.getMain(new MainRequest()).subscribeOn(Schedulers.io());                                                                  
Observable<ExtraResponse> observableExtra =                                           
        retrofitApi.getExtra(new ExtraRequest()).subscribeOn(Schedulers.io());                                                                              
Observable.zip(observableMain, observableExtra,                                                  
        new BiFunction<MainResponse, ExtraResponse, NewResponse>() {         
            @Override                                                                     
            public NewResponse apply(MainResponse main,                          
                                  ExtraResponse extra) {     
                return new NewResponse(main, extra);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) {                      
                // do something;                                                           
            }                                                                             
        });     
4.大数据流用Flowable,小数据流用Observable

上游数据流过大而下游来不及处理可以进行数据过滤,否则将导致内存溢出(极端情况),较好的方式是根据业务场景选择合适的策略:BackpressureStrategy.DROPBackpressureStrategy.LATEST。采用Flowable下游可控制取数据的时机。

5.避免内存泄漏

适当时机调用dispose(),或使用CompositeDisposable统一切断,一般会在生命周期onDestroy()回调时进行数据流切断处理。

三、问题整理:

1.链式调用整体流程:
层层包装 - 逐级订阅(构造回调链) - onNext逐级回调
操作符内部方法被穿插在onNext之间

2.为什么subscribeOn()在后面执行,而前面代码的线程被切换?
由于RxJava的层层包装,subscribeOn()执行后生成的Observable对象ObservableSubscribeOn包含前面生成的Observable对象,
下游逐级订阅上游时,将subscribeOn()的上游全部放入runnable中,故上游线程被切换,若subscribeOn()下游不切换线程,下游将运行在subscribeOn()指定的线程

3.订阅好的关系为什么可以通过disposable.dispose切断呢

  • Disposable对象获取:subscribe()、connect()执行后会获得Disposable,参数为Observer的subscribe()方法在onSubscribe(Disposable d)中接收Disposable
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

public final Disposable connect()
  • Disposable是什么:一个接口,由Observer实现用以切断数据流
    以observeOn()操作符对应的ObservableObserveOn为例,其内部类ObserveOnObserver实现了Disposable接口
    并在其内部onSubscribe(Disposable d)中接收上游Disposable,自己内部做处理后调用下游Observer的onSubscribe(this)并传入自身对象。它的dispose()方法将回调上游dispose()方法同时调用worker.dispose()queue.clear();又如create()操作符对应的ObservableCreate,内部CreateEmitter实现了Disposable接口,调用下游onNext()方法时会进行是否dispose判断
    若已经dispose则不会调用下游onNext() if (!isDisposed()) { observer.onNext(t); }
  • dispose()将从下游到上游逐级回调,最上游仍能发射数据,下游不回调
  • CompositeDisposable:一个Disposable集合(由RxJava内部提供的OpenHashSet维护, 线程安全), CompositeDisposable.dispose()会遍历内部的所有Disposable执行dispose操作.如Activity销毁的时候需统一对这些订阅进行取消,那么就可以新增CompositeDisposable对象作为成员,订阅后将Disposable对象都添加到CompositeDisposable对象中,等到Activity执行onDestroy生命周期时调用CompositeDisposable.dispose(),进行统一取消订阅操作
dispose_DisposableHelper.png
dispose_ObservableCreate_1.png
dispose_ObservableCreate_2.png
dispose_ObservableObserveOn_1.png
dispose_ObservableObserveOn_2.png

4.内存泄漏问题:ondestroy中调用dispose
在rxjava中主要注意的就是内存泄漏问题,现有比较有名的管理rxjava内存的库有RxLifecycle和AutoDispose 这里使用AutoDispose管理,在0.8.0版本之后针对Androidx,如果不是androidx 要用之前版本。在activity和fragment中可以直接使用,在Androidx中activity和fragment本身是实现了lifecycle的
https://juejin.im/post/6873448411727986701

5.多次设置subscribeOn()的问题

.subscribeOn(Schedulers.io()) //第一次
.subscribeOn(Schedulers.newThread()) //第二次
.subscribeOn(AndroidSchedulers.mainThread()) //第三次

如果我们多次设置subscribeOn()只有第一次的subscribeOn()起作用,如果此时下游不切线程,下游将在第一次subscribeOn指定的线程执行subscribeOn()将上游整个订阅链封装到Runnable中,丢入指定线程执行,连续多次调用subscribeOn()由于订阅链从下至上,故最上层subscribeOn()的线程切换才会将更上游的操作放入其指定的线程,而后者的线程切换则不起作用

6.如果我们多次设置ObserveOn()下游将在最后一次ObserveOn()指定的线程执行,因为ObserveOn()将下游onNext()丢入指定的线程
若为.observeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()),上下游都将在主线程执行,而上游发射的数据将会入队,造成上游数据发射完下游才取数据在主线程执行

7.上游onNext()与下游onNext()同步与异步情况下调用执行顺序
上下游同步,上游onNext执行1次后下游立即回调onNext1此,上下游异步,上游发射的数据将入队,下游从队列中取数据

8.RxPlugins


RxJavaPlugins.png

RxJava操作符执行在返回新Observable时可Hook一层自己的封装,在真实操作符执行前完成自己预设的逻辑。

RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
    @Override
    public Observable apply(Observable observable) throws Throwable {
        return new MyObservable(observable);  // 实现自己的MyObservable类
    }
});

RxJavaPlugins.setOnObservableSubscribe(new BiFunction<Observable, Observer, Observer>() {
    @Override
    public Observer apply(Observable observable, Observer observer) throws Throwable {
        return new MyObserver(observer);  // 实现自己的MyObserver类
    }
});

参考:RxJavaPlugins

9.自定义操作符

自定义处理上游Observable发射的单独数据项implements ObservableOperator,并通过lift()连接操作符;若为变换Observable发射的整个数据序列则implements ObservableTransformer,重写apply()方法,通过compose()连接操作符。参考RxJava自定义操作符

P.S. 本文源码基于RxJava 3.0.4
By the way: 非常感谢能提出批评和修改意见
contact: tu.wentai@nexuslink.cn

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容