RxJava 使用篇

一、什么是RxJava

Rx = Reactive Extension

  • Reactive 响应

    • 响应式编程
    • 基于观察者模式
      • 注入回调
      • 调用回调
    • 事件序列
      • 不可预知、动态离散
      • 如用户点击、异步请求、Model更新
  • Extension 扩展

    • 函数式编程
      • 单一职责原则
      • 依赖倒置原则
    • 数据流
      • 现成的,静态的,连续的
      • 如处理字符串数据,文件数据等
    • 线程调度
      • 时间控制:延时、周期任务
      • 线程控制:前后台调度
    • 异常处理
      • 重试
      • onError/onErrorResumeNext/onErrorReturn
    • 轮子
      • 转换、过滤、防抖、组合、重复、重试等操作符
  • 三种编程思想的对比 举例把大象放进冰箱

    • 面向对象编程:
      1. 构建一个冰箱,具有开门,关门的方法
      2. 构建一个大象,具有走进冰箱的方法
      3. 实例化一个冰箱对象,实例化一个大象对象,冰箱对象调用开门方法,大象对象调用走进冰箱方法,冰箱对象调用关门方法,大象被成功装入冰箱。
    • 响应式编程:
      1. 构建一个发射源,发送大象
      2. 构建一个响应器,接受到大象后关进冰箱。
      3. 用这个响应器监听发射源。
    • 函数式编程:
      1. 构建一个函数,接收大象和冰箱两个参数
      2. 在此函数内部做实现,返回冰箱已经装入大象
      3. 调用此函数,将大象和冰箱作为参数传入

二、基本知识

观察源

观察源
  • Observable / Flowable

    • 基本数据流观察源
    • 可发射多个onNext事件
    • 可发射onError或onComplete事件来终止结束整个事件流
  • Single

    • 单独发射一个onSuccess或者onError事件
    • onSuccess相当于onNext
  • Completable

    • 单独发射一个onComplete或onError事件,一般用于单纯的调用,而没有数据处理的逻辑
    • 比Observable少了很多处理元素的操作符
    • 经常使用andThen来转换流到其他观察源
  • Maybe

    • Single和Completable的结合。有onSuccess、onComlete、onError三种事件,但只会发射其中一个。

事件源 Subject

  • PublishSubject
    • 观察者只能收到订阅之后的事件
  • BehaviorSubject
    • 粘性,订阅时会立即收到订阅前最后一个事件或默认事件
  • ReplaySubject
    • 无论什么时候订阅,都可以收到所有事件
    • 当然,可以指定Replay的初始容量(默认16),上限(默认无上限),或Replay的时间上限
  • AsyncSubject
    • subject的onComplete被调用时,才会把事件发射给观察者
  • SerialedSubject
    • 串行Subject,保证发射一个事件,消费完才会发射下一个事件

背压

  • Observable / Flowable 无背压处理
    • Observable数据流处理元素数量不要过多,否则容易OOM
    • Flowable专门用于处理大量数据流,如解析各种流等。
    • Flowable可以控制数据发出速度
  • Subject / Process 有背压处理
    • Subject消费速度不要低于生产速度,否则可能出现OOM
    • Process可以选择背压策略来处理消费速度低于生产速度的情况
  • 背压策略
    • 上游背压策略。通过 create 或 toFlowable 创建的时候可以选择5种策略
      • MISSING: 背压交由下游处理(通过onBackpressureXXX)
      • ERROR: 下游无法处理时,抛出MissingBackpressureException
      • BUFFER: 缓存起来,直到下游可以消费掉
      • DROP: 抛弃掉,如果下游无法处理
      • LATEST: 只保留最新的
    • 下游背压处理
      • onBackpressureDrop 方法处理背压,下游实现onDrop方法
      • onBackpressureLatest 方法处理背压,相当于上游选择了LATEST策略
      • onBackpressureBuffer 方法处理背压,此时可以选择ERROR DROP_OLDEST DROP_LATEST三种应对策略

事件流

  • 创建:创建事件流或数据流
  • 组合:使用链式操作符来变换所创建的事件流
  • 监听:订阅事件流并实现业务响应事件

操作符

  • 创建
    • create 用函数式创建观察源
    • just 用常量或变量创建观察源
    • formArray 用数组创建观察源,元素逐个发送
    • fromIterable 用可迭代对象创建观察源,元素逐个发送
    • range 用整数数列创建观察源,元素逐个发射
    • timer interval 时间类观察源,此类观察源默认使用computation调度器
    • merge concat 等组合类,可以合并多个观察源来创建一个观察源
  • 转换
    • map 变换元素
    • flatMap 从元素切换到新的观察源
  • 过滤
    • filter 符合条件的发射到下游
    • distinct 非重复的元素才发射到下游
    • take 指定允许发射到下游的个数
    • skip 忽略发射到下游的个数
    • ofType 只允许指令类型的元素发射到下游
  • 防抖
    • debounce throttle 防抖或取样
  • 组合
    • merge concat 合并多个流,并按规则分别发射这些流的元素
    • 各个流不会相互影响发射到下游的结果
  • 聚合
    • zip amb combineLast scan 合并多个流,并对这些流的元素合并处理后再发射到下游
    • 各个流会相互影响发射到下游的结果
  • 重复
    • repeat onComplete后自动重新订阅
    • retry onError后自动重新订阅
  • 异步阻塞转同步
    • blockingFirst等

线程调度

  • 分类与调度规则
    • Schedulers.trampoline
      • 默认。当前线程
    • Schedulers.single
      • 一个单例的后台线程
    • Schedulers.newThread
      • 总是启用新线程,并在新线程执行操作。
    • Schedulers.io
      • I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的调度器。和newThread() 类似,区别在于 io() 实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 性能消耗更低。
      • 不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation
      • 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 限制性能的操作,例如图形的计算,延时计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
      • timer、interval等很多时间相关操作符将其作为默认调度器
    • Schedulers.from()
      • 通过一个指定一个Executor来担当调度器
    • AndroidSchedulers.mainThread
      • RxAndroid提供的,它指定的操作将在 Android 主线程运行。

三、其他

扩展 Rx相关库

  • Retrofit
    • 网络请求响应
  • RxBinding
    • View响应
  • RxPermissions
    • Permission状态改变响应
  • RxLifeCycle
    • 作者并不推崇这个库
  • RxBus
    • 事件总线
// RxBinding
RxView.clicks(view)
    .throttleFirst(ms, TimeUnit.MILLISECONDS)
    .compose(RxLifecycleAndroid.bindView(view))
    .subscribe(x -> listener.onClick(view));
    
// RxPermissions
new RxPermissions(getActivity())
    .request(Manifest.permission.CAMERA)
    .subscribe(granded -> {
        if(granted) {
            // ...
        }
    });
    
// RxLifecycle
public class OneFragment extends RxFragment {
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(saveInstanceState);
        PublishSubject.create()
            .compose(bindUntilEvent(FragmentEvent.DESTROY))
            .subscribe();
    }
}

// RxPreferences
RxPreferences.INSTANCE
    .<String>onPreferenceChanged()
    .filter(p -> TextUtils.equels(p.getKey(), SOME_PREFERENCE_KEY))
    .subscribe(p -> ...);
    
// RxNetwork
RxNetwork.INSTANCE
    .onConnectionChanged()
    .filter(info -> info.getNetworkType() == ConnectivityManager.TYPE_WIFI)
    .filter(info -> !info.isConnected())
    .observeOn(AndroidSchedulers.mainThread())
    .subcribe(info -> ...);

常见坑

  • 生命周期
    • dispose!
      • 由于很多事件流都不在主线程,避免线程泄露必须注意dispose
    • dispose?
      • dispose只是结束订阅事件流,首先不能立即停止最后一个异步事件,更不能停止操作符引入的线程
    • RxLifeCycle?
      • 其作者对这个库持怀疑态度
      • 如果订阅的地方发生在没有生命周期的类中,就需要组件去获取Activity的生命周期,然而这种行为是没有保障的,当订阅失败时也是模糊的,如果不是人为去执行,往往具有不确定性
      • 有些事件队列的生命周期和Activity等的生命周期不等价,依然需要手动处理,如果手动处理和自动处理并存则让人困惑
      • 你的Activity和Fragment需要继承库里面的相关基类
      • 建议使用AutoDispose,其比RxLifeCycle更优秀
  • CompositeDisposable
    • clear 对所有add进来的Dispsable执行dispose
    • dispose 在clear的基础上,让这个CompositeDisposable无法再使用,甚至add就会dispose你的事件流
  • 线程调度
    • subscribeOn的坑
// a()运行在computation b()运行在io c() d()运行在主线程
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable
            .create(emitter -> c())
            .subscribeOn(AndroidSchedulers.mainThread());            
    })
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
// 当把flatMap里面的subscribeOn移到主流程上,事情就变了
// a()运行在主线程上 b() c() d()运行在io。第二个subscribeOn不生效
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable.create(emitter -> c())
    })
    .subscribeOn(AndroidSchedulers.mainThread());         
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
  • 背压
    • 如果不注意处理背压就可能导致OOM
  • UnDeliverableException
    • 未复写subscribe()的onError,当上游抛出错误,整个流会直接抛异常
      • 记得写onError或处理异常的操作符,如retry、onErrorReturn等
    • 流已经被dispose了,此时上游抛异常
      • dispose与Thread.interrupt()类似,只起到通知的作用,不起到立即结束的作用
      • 判断流是否断开再抛异常
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容